60
60
MQTT_PINGREQ = b"\xc0 \0 "
61
61
MQTT_PINGRESP = const (0xD0 )
62
62
MQTT_PUBLISH = const (0x30 )
63
- MQTT_SUB = b" \x82 "
63
+ MQTT_SUB = const ( 0x82 )
64
64
MQTT_UNSUB = b"\xA2 "
65
65
MQTT_DISCONNECT = b"\xe0 \0 "
66
66
@@ -626,18 +626,7 @@ def _connect(
626
626
var_header [6 ] |= 0x4 | (self ._lw_qos & 0x1 ) << 3 | (self ._lw_qos & 0x2 ) << 3
627
627
var_header [6 ] |= self ._lw_retain << 5
628
628
629
- # Remaining length calculation
630
- large_rel_length = False
631
- if remaining_length > 0x7F :
632
- large_rel_length = True
633
- # Calculate Remaining Length [2.2.3]
634
- while remaining_length > 0 :
635
- encoded_byte = remaining_length % 0x80
636
- remaining_length = remaining_length // 0x80
637
- # if there is more data to encode, set the top bit of the byte
638
- if remaining_length > 0 :
639
- encoded_byte |= 0x80
640
- fixed_header .append (encoded_byte )
629
+ large_rel_length = self .encode_remaining_length (fixed_header , remaining_length )
641
630
if large_rel_length :
642
631
fixed_header .append (0x00 )
643
632
else :
@@ -680,6 +669,25 @@ def _connect(
680
669
f"No data received from broker for { self ._recv_timeout } seconds."
681
670
)
682
671
672
+ # pylint: disable=no-self-use
673
+ def encode_remaining_length (self , fixed_header , remaining_length ):
674
+ """
675
+ Encode Remaining Length [2.2.3]
676
+ """
677
+ # Remaining length calculation
678
+ large_rel_length = False
679
+ if remaining_length > 0x7F :
680
+ large_rel_length = True
681
+ while remaining_length > 0 :
682
+ encoded_byte = remaining_length % 0x80
683
+ remaining_length = remaining_length // 0x80
684
+ # if there is more data to encode, set the top bit of the byte
685
+ if remaining_length > 0 :
686
+ encoded_byte |= 0x80
687
+ fixed_header .append (encoded_byte )
688
+
689
+ return large_rel_length
690
+
683
691
def disconnect (self ) -> None :
684
692
"""Disconnects the MiniMQTT client from the MQTT broker."""
685
693
self ._connected ()
@@ -812,7 +820,7 @@ def publish(
812
820
813
821
def subscribe (self , topic : str , qos : int = 0 ) -> None :
814
822
"""Subscribes to a topic on the MQTT Broker.
815
- This method can subscribe to one topics or multiple topics.
823
+ This method can subscribe to one topic or multiple topics.
816
824
817
825
:param str|tuple|list topic: Unique MQTT topic identifier string. If
818
826
this is a `tuple`, then the tuple should
@@ -842,20 +850,27 @@ def subscribe(self, topic: str, qos: int = 0) -> None:
842
850
self ._valid_topic (t )
843
851
topics .append ((t , q ))
844
852
# Assemble packet
853
+ self .logger .debug ("Sending SUBSCRIBE to broker..." )
854
+ fixed_header = bytearray ([MQTT_SUB ])
845
855
packet_length = 2 + (2 * len (topics )) + (1 * len (topics ))
846
856
packet_length += sum (len (topic .encode ("utf-8" )) for topic , qos in topics )
847
- packet_length_byte = packet_length .to_bytes (1 , "big" )
857
+ self .encode_remaining_length (fixed_header , remaining_length = packet_length )
858
+ self .logger .debug (f"Fixed Header: { fixed_header } " )
859
+ self ._sock .send (fixed_header )
848
860
self ._pid = self ._pid + 1 if self ._pid < 0xFFFF else 1
849
861
packet_id_bytes = self ._pid .to_bytes (2 , "big" )
850
- # Packet with variable and fixed headers
851
- packet = MQTT_SUB + packet_length_byte + packet_id_bytes
862
+ var_header = packet_id_bytes
863
+ self .logger .debug (f"Variable Header: { var_header } " )
864
+ self ._sock .send (var_header )
852
865
# attaching topic and QOS level to the packet
866
+ packet = bytes ()
853
867
for t , q in topics :
854
868
topic_size = len (t .encode ("utf-8" )).to_bytes (2 , "big" )
855
869
qos_byte = q .to_bytes (1 , "big" )
856
870
packet += topic_size + t .encode () + qos_byte
857
871
for t , q in topics :
858
872
self .logger .debug ("SUBSCRIBING to topic %s with QoS %d" , t , q )
873
+ self .logger .debug (f"packet: { packet } " )
859
874
self ._sock .send (packet )
860
875
stamp = self .get_monotonic_time ()
861
876
while True :
@@ -869,7 +884,7 @@ def subscribe(self, topic: str, qos: int = 0) -> None:
869
884
if op == 0x90 :
870
885
rc = self ._sock_exact_recv (3 )
871
886
# Check packet identifier.
872
- assert rc [1 ] == packet [ 2 ] and rc [2 ] == packet [ 3 ]
887
+ assert rc [1 ] == var_header [ 0 ] and rc [2 ] == var_header [ 1 ]
873
888
remaining_len = rc [0 ] - 2
874
889
assert remaining_len > 0
875
890
rc = self ._sock_exact_recv (remaining_len )
0 commit comments