@@ -98,19 +98,25 @@ var ConnErrors = map[byte]error{
98
98
//to read an MQTT packet from the stream. It returns a ControlPacket
99
99
//representing the decoded MQTT packet and an error. One of these returns will
100
100
//always be nil, a nil ControlPacket indicating an error occurred.
101
- func ReadPacket (r io.Reader ) (cp ControlPacket , err error ) {
101
+ func ReadPacket (r io.Reader ) (ControlPacket , error ) {
102
102
var fh FixedHeader
103
103
b := make ([]byte , 1 )
104
104
105
- _ , err = io .ReadFull (r , b )
105
+ _ , err : = io .ReadFull (r , b )
106
106
if err != nil {
107
107
return nil , err
108
108
}
109
- fh .unpack (b [0 ], r )
110
- cp = NewControlPacketWithHeader (fh )
111
- if cp == nil {
112
- return nil , errors .New ("Bad data from client" )
109
+
110
+ err = fh .unpack (b [0 ], r )
111
+ if err != nil {
112
+ return nil , err
113
+ }
114
+
115
+ cp , err := NewControlPacketWithHeader (fh )
116
+ if err != nil {
117
+ return nil , err
113
118
}
119
+
114
120
packetBytes := make ([]byte , fh .RemainingLength )
115
121
n , err := io .ReadFull (r , packetBytes )
116
122
if err != nil {
@@ -128,79 +134,75 @@ func ReadPacket(r io.Reader) (cp ControlPacket, err error) {
128
134
//by packetType, this is usually done by reference to the packet type constants
129
135
//defined in packets.go. The newly created ControlPacket is empty and a pointer
130
136
//is returned.
131
- func NewControlPacket (packetType byte ) ( cp ControlPacket ) {
137
+ func NewControlPacket (packetType byte ) ControlPacket {
132
138
switch packetType {
133
139
case Connect :
134
- cp = & ConnectPacket {FixedHeader : FixedHeader {MessageType : Connect }}
140
+ return & ConnectPacket {FixedHeader : FixedHeader {MessageType : Connect }}
135
141
case Connack :
136
- cp = & ConnackPacket {FixedHeader : FixedHeader {MessageType : Connack }}
142
+ return & ConnackPacket {FixedHeader : FixedHeader {MessageType : Connack }}
137
143
case Disconnect :
138
- cp = & DisconnectPacket {FixedHeader : FixedHeader {MessageType : Disconnect }}
144
+ return & DisconnectPacket {FixedHeader : FixedHeader {MessageType : Disconnect }}
139
145
case Publish :
140
- cp = & PublishPacket {FixedHeader : FixedHeader {MessageType : Publish }}
146
+ return & PublishPacket {FixedHeader : FixedHeader {MessageType : Publish }}
141
147
case Puback :
142
- cp = & PubackPacket {FixedHeader : FixedHeader {MessageType : Puback }}
148
+ return & PubackPacket {FixedHeader : FixedHeader {MessageType : Puback }}
143
149
case Pubrec :
144
- cp = & PubrecPacket {FixedHeader : FixedHeader {MessageType : Pubrec }}
150
+ return & PubrecPacket {FixedHeader : FixedHeader {MessageType : Pubrec }}
145
151
case Pubrel :
146
- cp = & PubrelPacket {FixedHeader : FixedHeader {MessageType : Pubrel , Qos : 1 }}
152
+ return & PubrelPacket {FixedHeader : FixedHeader {MessageType : Pubrel , Qos : 1 }}
147
153
case Pubcomp :
148
- cp = & PubcompPacket {FixedHeader : FixedHeader {MessageType : Pubcomp }}
154
+ return & PubcompPacket {FixedHeader : FixedHeader {MessageType : Pubcomp }}
149
155
case Subscribe :
150
- cp = & SubscribePacket {FixedHeader : FixedHeader {MessageType : Subscribe , Qos : 1 }}
156
+ return & SubscribePacket {FixedHeader : FixedHeader {MessageType : Subscribe , Qos : 1 }}
151
157
case Suback :
152
- cp = & SubackPacket {FixedHeader : FixedHeader {MessageType : Suback }}
158
+ return & SubackPacket {FixedHeader : FixedHeader {MessageType : Suback }}
153
159
case Unsubscribe :
154
- cp = & UnsubscribePacket {FixedHeader : FixedHeader {MessageType : Unsubscribe , Qos : 1 }}
160
+ return & UnsubscribePacket {FixedHeader : FixedHeader {MessageType : Unsubscribe , Qos : 1 }}
155
161
case Unsuback :
156
- cp = & UnsubackPacket {FixedHeader : FixedHeader {MessageType : Unsuback }}
162
+ return & UnsubackPacket {FixedHeader : FixedHeader {MessageType : Unsuback }}
157
163
case Pingreq :
158
- cp = & PingreqPacket {FixedHeader : FixedHeader {MessageType : Pingreq }}
164
+ return & PingreqPacket {FixedHeader : FixedHeader {MessageType : Pingreq }}
159
165
case Pingresp :
160
- cp = & PingrespPacket {FixedHeader : FixedHeader {MessageType : Pingresp }}
161
- default :
162
- return nil
166
+ return & PingrespPacket {FixedHeader : FixedHeader {MessageType : Pingresp }}
163
167
}
164
- return cp
168
+ return nil
165
169
}
166
170
167
171
//NewControlPacketWithHeader is used to create a new ControlPacket of the type
168
172
//specified within the FixedHeader that is passed to the function.
169
173
//The newly created ControlPacket is empty and a pointer is returned.
170
- func NewControlPacketWithHeader (fh FixedHeader ) (cp ControlPacket ) {
174
+ func NewControlPacketWithHeader (fh FixedHeader ) (ControlPacket , error ) {
171
175
switch fh .MessageType {
172
176
case Connect :
173
- cp = & ConnectPacket {FixedHeader : fh }
177
+ return & ConnectPacket {FixedHeader : fh }, nil
174
178
case Connack :
175
- cp = & ConnackPacket {FixedHeader : fh }
179
+ return & ConnackPacket {FixedHeader : fh }, nil
176
180
case Disconnect :
177
- cp = & DisconnectPacket {FixedHeader : fh }
181
+ return & DisconnectPacket {FixedHeader : fh }, nil
178
182
case Publish :
179
- cp = & PublishPacket {FixedHeader : fh }
183
+ return & PublishPacket {FixedHeader : fh }, nil
180
184
case Puback :
181
- cp = & PubackPacket {FixedHeader : fh }
185
+ return & PubackPacket {FixedHeader : fh }, nil
182
186
case Pubrec :
183
- cp = & PubrecPacket {FixedHeader : fh }
187
+ return & PubrecPacket {FixedHeader : fh }, nil
184
188
case Pubrel :
185
- cp = & PubrelPacket {FixedHeader : fh }
189
+ return & PubrelPacket {FixedHeader : fh }, nil
186
190
case Pubcomp :
187
- cp = & PubcompPacket {FixedHeader : fh }
191
+ return & PubcompPacket {FixedHeader : fh }, nil
188
192
case Subscribe :
189
- cp = & SubscribePacket {FixedHeader : fh }
193
+ return & SubscribePacket {FixedHeader : fh }, nil
190
194
case Suback :
191
- cp = & SubackPacket {FixedHeader : fh }
195
+ return & SubackPacket {FixedHeader : fh }, nil
192
196
case Unsubscribe :
193
- cp = & UnsubscribePacket {FixedHeader : fh }
197
+ return & UnsubscribePacket {FixedHeader : fh }, nil
194
198
case Unsuback :
195
- cp = & UnsubackPacket {FixedHeader : fh }
199
+ return & UnsubackPacket {FixedHeader : fh }, nil
196
200
case Pingreq :
197
- cp = & PingreqPacket {FixedHeader : fh }
201
+ return & PingreqPacket {FixedHeader : fh }, nil
198
202
case Pingresp :
199
- cp = & PingrespPacket {FixedHeader : fh }
200
- default :
201
- return nil
203
+ return & PingrespPacket {FixedHeader : fh }, nil
202
204
}
203
- return cp
205
+ return nil , fmt . Errorf ( "unsupported packet type 0x%x" , fh . MessageType )
204
206
}
205
207
206
208
//Details struct returned by the Details() function called on
@@ -241,24 +243,34 @@ func (fh *FixedHeader) pack() bytes.Buffer {
241
243
return header
242
244
}
243
245
244
- func (fh * FixedHeader ) unpack (typeAndFlags byte , r io.Reader ) {
246
+ func (fh * FixedHeader ) unpack (typeAndFlags byte , r io.Reader ) error {
245
247
fh .MessageType = typeAndFlags >> 4
246
248
fh .Dup = (typeAndFlags >> 3 )& 0x01 > 0
247
249
fh .Qos = (typeAndFlags >> 1 ) & 0x03
248
250
fh .Retain = typeAndFlags & 0x01 > 0
249
- fh .RemainingLength = decodeLength (r )
251
+
252
+ var err error
253
+ fh .RemainingLength , err = decodeLength (r )
254
+ return err
250
255
}
251
256
252
- func decodeByte (b io.Reader ) byte {
257
+ func decodeByte (b io.Reader ) ( byte , error ) {
253
258
num := make ([]byte , 1 )
254
- b .Read (num )
255
- return num [0 ]
259
+ _ , err := b .Read (num )
260
+ if err != nil {
261
+ return 0 , err
262
+ }
263
+
264
+ return num [0 ], nil
256
265
}
257
266
258
- func decodeUint16 (b io.Reader ) uint16 {
267
+ func decodeUint16 (b io.Reader ) ( uint16 , error ) {
259
268
num := make ([]byte , 2 )
260
- b .Read (num )
261
- return binary .BigEndian .Uint16 (num )
269
+ _ , err := b .Read (num )
270
+ if err != nil {
271
+ return 0 , err
272
+ }
273
+ return binary .BigEndian .Uint16 (num ), nil
262
274
}
263
275
264
276
func encodeUint16 (num uint16 ) []byte {
@@ -268,19 +280,27 @@ func encodeUint16(num uint16) []byte {
268
280
}
269
281
270
282
func encodeString (field string ) []byte {
271
-
272
283
return encodeBytes ([]byte (field ))
273
284
}
274
285
275
- func decodeString (b io.Reader ) string {
276
- return string (decodeBytes (b ))
286
+ func decodeString (b io.Reader ) (string , error ) {
287
+ buf , err := decodeBytes (b )
288
+ return string (buf ), err
277
289
}
278
290
279
- func decodeBytes (b io.Reader ) []byte {
280
- fieldLength := decodeUint16 (b )
291
+ func decodeBytes (b io.Reader ) ([]byte , error ) {
292
+ fieldLength , err := decodeUint16 (b )
293
+ if err != nil {
294
+ return nil , err
295
+ }
296
+
281
297
field := make ([]byte , fieldLength )
282
- b .Read (field )
283
- return field
298
+ _ , err = b .Read (field )
299
+ if err != nil {
300
+ return nil , err
301
+ }
302
+
303
+ return field , nil
284
304
}
285
305
286
306
func encodeBytes (field []byte ) []byte {
@@ -305,18 +325,22 @@ func encodeLength(length int) []byte {
305
325
return encLength
306
326
}
307
327
308
- func decodeLength (r io.Reader ) int {
328
+ func decodeLength (r io.Reader ) ( int , error ) {
309
329
var rLength uint32
310
330
var multiplier uint32
311
331
b := make ([]byte , 1 )
312
332
for multiplier < 27 { //fix: Infinite '(digit & 128) == 1' will cause the dead loop
313
- io .ReadFull (r , b )
333
+ _ , err := io .ReadFull (r , b )
334
+ if err != nil {
335
+ return 0 , err
336
+ }
337
+
314
338
digit := b [0 ]
315
339
rLength |= uint32 (digit & 127 ) << multiplier
316
340
if (digit & 128 ) == 0 {
317
341
break
318
342
}
319
343
multiplier += 7
320
344
}
321
- return int (rLength )
345
+ return int (rLength ), nil
322
346
}
0 commit comments