forked from eclipse-paho/paho.mqtt.golang
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtoken.go
90 lines (78 loc) · 1.64 KB
/
token.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/*
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Allan Stockdill-Mander
*/
package mqtt
import (
. "github.com/alsm/hrotti/packets"
"sync"
"time"
)
type PacketAndToken struct {
p ControlPacket
t Token
}
type Token interface {
Wait()
WaitTimeout(time.Duration)
flowComplete()
}
type baseToken struct {
m sync.RWMutex
complete chan struct{}
ready bool
}
// Wait will wait indefinitely for the Token to complete, ie the Publish
// to be sent and confirmed receipt from the broker
func (b *baseToken) Wait() {
b.m.Lock()
defer b.m.Unlock()
if !b.ready {
<-b.complete
b.ready = true
}
}
// WaitTimeout takes a time in ms
func (b *baseToken) WaitTimeout(d time.Duration) {
b.m.Lock()
defer b.m.Unlock()
if !b.ready {
select {
case <-b.complete:
b.ready = true
case <-time.After(d):
}
}
}
func (b *baseToken) flowComplete() {
close(b.complete)
}
func newToken(tType byte) Token {
switch tType {
case SUBSCRIBE:
return &SubscribeToken{baseToken: baseToken{complete: make(chan struct{})}, subResult: make(map[string]byte)}
case PUBLISH:
return &PublishToken{baseToken: baseToken{complete: make(chan struct{})}}
}
return nil
}
type PublishToken struct {
baseToken
}
type SubscribeToken struct {
baseToken
subs []string
subResult map[string]byte
}
func (s *SubscribeToken) Result() map[string]byte {
s.m.RLock()
defer s.m.RUnlock()
return s.subResult
}