@@ -26,16 +26,20 @@ import (
26
26
"github.com/sirupsen/logrus"
27
27
)
28
28
29
- // DiscoveryManager is required to handle multiple pluggable-discovery that
30
- // may be shared across platforms
29
+ // DiscoveryManager manages the many-to-many communication between all pluggable
30
+ // discoveries and all watchers. Each PluggableDiscovery, once started, will
31
+ // produce a sequence of "events". These events will be broadcasted to all
32
+ // listening Watcher.
33
+ // The DiscoveryManager will not start the discoveries until the Start method
34
+ // is called.
31
35
type DiscoveryManager struct {
32
36
discoveriesMutex sync.Mutex
33
- discoveries map [string ]* discovery.PluggableDiscovery
34
- discoveriesRunning bool
35
- feed chan * discovery.Event
37
+ discoveries map [string ]* discovery.PluggableDiscovery // all registered PluggableDiscovery
38
+ discoveriesRunning bool // set to true once discoveries are started
39
+ feed chan * discovery.Event // all events will pass through this channel
36
40
watchersMutex sync.Mutex
37
- watchers map [* PortWatcher ]bool
38
- watchersCache map [string ]map [string ]* discovery.Event
41
+ watchers map [* PortWatcher ]bool // all registered Watcher
42
+ watchersCache map [string ]map [string ]* discovery.Event // this is a cache of all active ports
39
43
}
40
44
41
45
var tr = i18n .Tr
@@ -85,7 +89,7 @@ func (dm *DiscoveryManager) Start() {
85
89
}
86
90
87
91
go func () {
88
- // Feed all watchers with data coming from the discoveries
92
+ // Send all events coming from the feed channel to all active watchers
89
93
for ev := range dm .feed {
90
94
dm .feedEvent (ev )
91
95
}
@@ -152,11 +156,13 @@ func (dm *DiscoveryManager) Watch() (*PortWatcher, error) {
152
156
}
153
157
go func () {
154
158
dm .watchersMutex .Lock ()
159
+ // When a watcher is started, send all the current active ports first...
155
160
for _ , cache := range dm .watchersCache {
156
161
for _ , ev := range cache {
157
162
watcher .feed <- ev
158
163
}
159
164
}
165
+ // ...and after that add the watcher to the list of watchers receiving events
160
166
dm .watchers [watcher ] = true
161
167
dm .watchersMutex .Unlock ()
162
168
}()
@@ -165,6 +171,7 @@ func (dm *DiscoveryManager) Watch() (*PortWatcher, error) {
165
171
166
172
func (dm * DiscoveryManager ) startDiscovery (d * discovery.PluggableDiscovery ) (discErr error ) {
167
173
defer func () {
174
+ // If this function returns an error log it
168
175
if discErr != nil {
169
176
logrus .Errorf ("Discovery %s failed to run: %s" , d .GetID (), discErr )
170
177
}
@@ -181,6 +188,7 @@ func (dm *DiscoveryManager) startDiscovery(d *discovery.PluggableDiscovery) (dis
181
188
// XXX do better cleanup if the discovery fails to start
182
189
183
190
go func () {
191
+ // Transfer all incoming events from this discovery to the feed channel
184
192
for ev := range eventCh {
185
193
dm .feed <- ev
186
194
}
0 commit comments