-
Notifications
You must be signed in to change notification settings - Fork 76
/
Copy pathwatch.go
109 lines (102 loc) · 2.48 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package client
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"strings"
)
// Result is a watch result
type Result struct {
Type string
Object interface{}
}
const Added = "ADDED"
const Modified = "MODIFIED"
const Deleted = "DELETED"
// WatchClient is a client for Watching the Kubernetes API
type WatchClient struct {
Cfg *Configuration
Client *APIClient
Path string
MakerFn func() interface{}
}
// Connect initiates a watch to the server.
func (w *WatchClient) Connect(ctx context.Context, resourceVersion string) (<-chan *Result, <-chan error, error) {
params := []string{"watch=true"}
if len(resourceVersion) != 0 {
params = append(params, "resourceVersion="+resourceVersion)
}
queryStr := "?" + strings.Join(params, "&")
url := w.Cfg.Scheme + "://" + w.Cfg.Host + w.Path + queryStr
req, err := w.Client.prepareRequest(ctx, url, "GET", nil, nil, nil, nil, "", []byte{})
if err != nil {
return nil, nil, err
}
res, err := w.Client.callAPI(req)
if err != nil {
return nil, nil, err
}
if res.StatusCode != 200 {
return nil, nil, fmt.Errorf("Error connecting watch (%d: %s)", res.StatusCode, res.Status)
}
resultChan := make(chan *Result, 1)
errChan := make(chan error, 1)
processWatch(res.Body, w.MakerFn, resultChan, errChan)
return resultChan, errChan, nil
}
func processWatch(stream io.Reader, makerFn func() interface{}, resultChan chan<- *Result, errChan chan<- error) {
scanner := bufio.NewScanner(stream)
go func() {
defer close(resultChan)
defer close(errChan)
for scanner.Scan() {
watchObj, err := decode(scanner.Text(), makerFn)
if err != nil {
errChan <- err
return
}
if watchObj != nil {
resultChan <- watchObj
}
}
if err := scanner.Err(); err != nil {
errChan <- err
}
}()
}
func decode(line string, makerFn func() interface{}) (*Result, error) {
if len(line) == 0 {
return nil, nil
}
// TODO: support protocol buffer encoding?
decoder := json.NewDecoder(strings.NewReader(line))
result := &Result{}
for decoder.More() {
name, err := decoder.Token()
if err != nil {
return nil, err
}
if name == "type" {
token, err := decoder.Token()
if err != nil {
return nil, err
}
var ok bool
result.Type, ok = token.(string)
if !ok {
return nil, fmt.Errorf("Error casting %v to string", token)
}
}
if name == "object" {
obj := makerFn()
if err := decoder.Decode(&obj); err != nil {
return nil, err
}
result.Object = obj
return result, nil
}
}
return nil, nil
}