diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index fff3aa9..1dad804 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -40,7 +40,7 @@ jobs: source actions-ci/install.sh - name: Pip install pylint, black, & Sphinx run: | - pip install --force-reinstall pylint==1.9.2 black==19.10b0 Sphinx sphinx-rtd-theme + pip install --force-reinstall pylint black==19.10b0 Sphinx sphinx-rtd-theme - name: Library version run: git describe --dirty --always --tags - name: PyLint diff --git a/adafruit_esp32spi/PWMOut.py b/adafruit_esp32spi/PWMOut.py index fbdbc05..8b9d6f0 100755 --- a/adafruit_esp32spi/PWMOut.py +++ b/adafruit_esp32spi/PWMOut.py @@ -27,7 +27,8 @@ * Author(s): Brent Rubell """ -class PWMOut(): + +class PWMOut: """ Implementation of CircuitPython PWMOut for ESP32SPI. @@ -37,16 +38,18 @@ class PWMOut(): :param int frequency: The target frequency in Hertz (32-bit). :param bool variable_frequency: True if the frequency will change over time. """ - ESP32_PWM_PINS = set([0, 1, 2, 4, 5, - 12, 13, 14, 15, - 16, 17, 18, 19, - 21, 22, 23, 25, - 26, 27, 32, 33]) - def __init__(self, esp, pwm_pin, *, frequency=500, duty_cycle=0, variable_frequency=False): + + ESP32_PWM_PINS = set( + [0, 1, 2, 4, 5, 12, 13, 14, 15, 16, 17, 18, 19, 21, 22, 23, 25, 26, 27, 32, 33] + ) + + def __init__( + self, esp, pwm_pin, *, frequency=500, duty_cycle=0, variable_frequency=False + ): if pwm_pin in self.ESP32_PWM_PINS: self._pwm_pin = pwm_pin else: - raise AttributeError("Pin %d is not a valid ESP32 GPIO Pin."%pwm_pin) + raise AttributeError("Pin %d is not a valid ESP32 GPIO Pin." % pwm_pin) self._esp = esp self._duty_cycle = duty_cycle self._freq = frequency @@ -67,8 +70,10 @@ def deinit(self): def _is_deinited(self): """Checks if PWMOut object has been previously de-initalized""" if self._pwm_pin is None: - raise ValueError("PWMOut Object has been deinitialized and can no longer " - "be used. Create a new PWMOut object.") + raise ValueError( + "PWMOut Object has been deinitialized and can no longer " + "be used. Create a new PWMOut object." + ) @property def duty_cycle(self): diff --git a/adafruit_esp32spi/adafruit_esp32spi.py b/adafruit_esp32spi/adafruit_esp32spi.py index 66ab7c5..347ea3b 100755 --- a/adafruit_esp32spi/adafruit_esp32spi.py +++ b/adafruit_esp32spi/adafruit_esp32spi.py @@ -52,110 +52,114 @@ __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_ESP32SPI.git" # pylint: disable=bad-whitespace -_SET_NET_CMD = const(0x10) -_SET_PASSPHRASE_CMD = const(0x11) -_SET_AP_NET_CMD = const(0x18) +_SET_NET_CMD = const(0x10) +_SET_PASSPHRASE_CMD = const(0x11) +_SET_AP_NET_CMD = const(0x18) _SET_AP_PASSPHRASE_CMD = const(0x19) -_SET_DEBUG_CMD = const(0x1A) - -_GET_CONN_STATUS_CMD = const(0x20) -_GET_IPADDR_CMD = const(0x21) -_GET_MACADDR_CMD = const(0x22) -_GET_CURR_SSID_CMD = const(0x23) -_GET_CURR_BSSID_CMD = const(0x24) -_GET_CURR_RSSI_CMD = const(0x25) -_GET_CURR_ENCT_CMD = const(0x26) - -_SCAN_NETWORKS = const(0x27) -_START_SERVER_TCP_CMD = const(0x28) -_GET_SOCKET_CMD = const(0x3F) -_GET_STATE_TCP_CMD = const(0x29) -_DATA_SENT_TCP_CMD = const(0x2A) -_AVAIL_DATA_TCP_CMD = const(0x2B) -_GET_DATA_TCP_CMD = const(0x2C) -_START_CLIENT_TCP_CMD = const(0x2D) -_STOP_CLIENT_TCP_CMD = const(0x2E) +_SET_DEBUG_CMD = const(0x1A) + +_GET_CONN_STATUS_CMD = const(0x20) +_GET_IPADDR_CMD = const(0x21) +_GET_MACADDR_CMD = const(0x22) +_GET_CURR_SSID_CMD = const(0x23) +_GET_CURR_BSSID_CMD = const(0x24) +_GET_CURR_RSSI_CMD = const(0x25) +_GET_CURR_ENCT_CMD = const(0x26) + +_SCAN_NETWORKS = const(0x27) +_START_SERVER_TCP_CMD = const(0x28) +_GET_SOCKET_CMD = const(0x3F) +_GET_STATE_TCP_CMD = const(0x29) +_DATA_SENT_TCP_CMD = const(0x2A) +_AVAIL_DATA_TCP_CMD = const(0x2B) +_GET_DATA_TCP_CMD = const(0x2C) +_START_CLIENT_TCP_CMD = const(0x2D) +_STOP_CLIENT_TCP_CMD = const(0x2E) _GET_CLIENT_STATE_TCP_CMD = const(0x2F) -_DISCONNECT_CMD = const(0x30) -_GET_IDX_RSSI_CMD = const(0x32) -_GET_IDX_ENCT_CMD = const(0x33) -_REQ_HOST_BY_NAME_CMD = const(0x34) -_GET_HOST_BY_NAME_CMD = const(0x35) -_START_SCAN_NETWORKS = const(0x36) -_GET_FW_VERSION_CMD = const(0x37) -_GET_TIME = const(0x3B) -_GET_IDX_BSSID_CMD = const(0x3C) -_GET_IDX_CHAN_CMD = const(0x3D) -_PING_CMD = const(0x3E) - -_SEND_DATA_TCP_CMD = const(0x44) -_GET_DATABUF_TCP_CMD = const(0x45) -_SET_ENT_IDENT_CMD = const(0x4A) -_SET_ENT_UNAME_CMD = const(0x4B) -_SET_ENT_PASSWD_CMD = const(0x4C) -_SET_ENT_ENABLE_CMD = const(0x4F) -_SET_CLI_CERT = const(0x40) -_SET_PK = const(0x41) - -_SET_PIN_MODE_CMD = const(0x50) +_DISCONNECT_CMD = const(0x30) +_GET_IDX_RSSI_CMD = const(0x32) +_GET_IDX_ENCT_CMD = const(0x33) +_REQ_HOST_BY_NAME_CMD = const(0x34) +_GET_HOST_BY_NAME_CMD = const(0x35) +_START_SCAN_NETWORKS = const(0x36) +_GET_FW_VERSION_CMD = const(0x37) +_GET_TIME = const(0x3B) +_GET_IDX_BSSID_CMD = const(0x3C) +_GET_IDX_CHAN_CMD = const(0x3D) +_PING_CMD = const(0x3E) + +_SEND_DATA_TCP_CMD = const(0x44) +_GET_DATABUF_TCP_CMD = const(0x45) +_SET_ENT_IDENT_CMD = const(0x4A) +_SET_ENT_UNAME_CMD = const(0x4B) +_SET_ENT_PASSWD_CMD = const(0x4C) +_SET_ENT_ENABLE_CMD = const(0x4F) +_SET_CLI_CERT = const(0x40) +_SET_PK = const(0x41) + +_SET_PIN_MODE_CMD = const(0x50) _SET_DIGITAL_WRITE_CMD = const(0x51) -_SET_ANALOG_WRITE_CMD = const(0x52) -_SET_DIGITAL_READ_CMD = const(0x53) -_SET_ANALOG_READ_CMD = const(0x54) - -_START_CMD = const(0xE0) -_END_CMD = const(0xEE) -_ERR_CMD = const(0xEF) -_REPLY_FLAG = const(1<<7) -_CMD_FLAG = const(0) - -SOCKET_CLOSED = const(0) -SOCKET_LISTEN = const(1) -SOCKET_SYN_SENT = const(2) -SOCKET_SYN_RCVD = const(3) +_SET_ANALOG_WRITE_CMD = const(0x52) +_SET_DIGITAL_READ_CMD = const(0x53) +_SET_ANALOG_READ_CMD = const(0x54) + +_START_CMD = const(0xE0) +_END_CMD = const(0xEE) +_ERR_CMD = const(0xEF) +_REPLY_FLAG = const(1 << 7) +_CMD_FLAG = const(0) + +SOCKET_CLOSED = const(0) +SOCKET_LISTEN = const(1) +SOCKET_SYN_SENT = const(2) +SOCKET_SYN_RCVD = const(3) SOCKET_ESTABLISHED = const(4) -SOCKET_FIN_WAIT_1 = const(5) -SOCKET_FIN_WAIT_2 = const(6) -SOCKET_CLOSE_WAIT = const(7) -SOCKET_CLOSING = const(8) -SOCKET_LAST_ACK = const(9) -SOCKET_TIME_WAIT = const(10) - -WL_NO_SHIELD = const(0xFF) -WL_NO_MODULE = const(0xFF) -WL_IDLE_STATUS = const(0) -WL_NO_SSID_AVAIL = const(1) -WL_SCAN_COMPLETED = const(2) -WL_CONNECTED = const(3) -WL_CONNECT_FAILED = const(4) -WL_CONNECTION_LOST = const(5) -WL_DISCONNECTED = const(6) -WL_AP_LISTENING = const(7) -WL_AP_CONNECTED = const(8) -WL_AP_FAILED = const(9) - -ADC_ATTEN_DB_0 = const(0) +SOCKET_FIN_WAIT_1 = const(5) +SOCKET_FIN_WAIT_2 = const(6) +SOCKET_CLOSE_WAIT = const(7) +SOCKET_CLOSING = const(8) +SOCKET_LAST_ACK = const(9) +SOCKET_TIME_WAIT = const(10) + +WL_NO_SHIELD = const(0xFF) +WL_NO_MODULE = const(0xFF) +WL_IDLE_STATUS = const(0) +WL_NO_SSID_AVAIL = const(1) +WL_SCAN_COMPLETED = const(2) +WL_CONNECTED = const(3) +WL_CONNECT_FAILED = const(4) +WL_CONNECTION_LOST = const(5) +WL_DISCONNECTED = const(6) +WL_AP_LISTENING = const(7) +WL_AP_CONNECTED = const(8) +WL_AP_FAILED = const(9) + +ADC_ATTEN_DB_0 = const(0) ADC_ATTEN_DB_2_5 = const(1) -ADC_ATTEN_DB_6 = const(2) -ADC_ATTEN_DB_11 = const(3) +ADC_ATTEN_DB_6 = const(2) +ADC_ATTEN_DB_11 = const(3) # pylint: enable=bad-whitespace + class ESP_SPIcontrol: # pylint: disable=too-many-public-methods, too-many-instance-attributes """A class that will talk to an ESP32 module programmed with special firmware that lets it act as a fast an efficient WiFi co-processor""" + TCP_MODE = const(0) UDP_MODE = const(1) TLS_MODE = const(2) # pylint: disable=too-many-arguments - def __init__(self, spi, cs_pin, ready_pin, reset_pin, gpio0_pin=None, *, debug=False): + def __init__( + self, spi, cs_pin, ready_pin, reset_pin, gpio0_pin=None, *, debug=False + ): self._debug = debug self.set_psk = False self.set_crt = False self._buffer = bytearray(10) self._pbuf = bytearray(1) # buffer for param read self._sendbuf = bytearray(256) # buffer for command sending - self._socknum_ll = [[0]] # pre-made list of list of socket # + self._socknum_ll = [[0]] # pre-made list of list of socket # self._spi_device = SPIDevice(spi, cs_pin, baudrate=8000000) self._cs = cs_pin @@ -168,6 +172,7 @@ def __init__(self, spi, cs_pin, ready_pin, reset_pin, gpio0_pin=None, *, debug=F if self._gpio0: self._gpio0.direction = Direction.INPUT self.reset() + # pylint: enable=too-many-arguments def reset(self): @@ -179,22 +184,22 @@ def reset(self): self._gpio0.value = True # not bootload mode self._cs.value = True self._reset.value = False - time.sleep(0.01) # reset + time.sleep(0.01) # reset self._reset.value = True - time.sleep(0.75) # wait for it to boot up + time.sleep(0.75) # wait for it to boot up if self._gpio0: self._gpio0.direction = Direction.INPUT def _wait_for_ready(self): """Wait until the ready pin goes low""" if self._debug >= 3: - print("Wait for ESP32 ready", end='') + print("Wait for ESP32 ready", end="") times = time.monotonic() while (time.monotonic() - times) < 10: # wait up to 10 seconds - if not self._ready.value: # we're ready! + if not self._ready.value: # we're ready! break if self._debug >= 3: - print('.', end='') + print(".", end="") time.sleep(0.05) else: raise RuntimeError("ESP32 not responding") @@ -207,12 +212,12 @@ def _send_command(self, cmd, params=None, *, param_len_16=False): if not params: params = () - packet_len = 4 # header + end byte + packet_len = 4 # header + end byte for i, param in enumerate(params): - packet_len += len(param) # parameter - packet_len += 1 # size byte + packet_len += len(param) # parameter + packet_len += 1 # size byte if param_len_16: - packet_len += 1 # 2 of em here! + packet_len += 1 # 2 of em here! while packet_len % 4 != 0: packet_len += 1 # we may need more space @@ -234,21 +239,24 @@ def _send_command(self, cmd, params=None, *, param_len_16=False): self._sendbuf[ptr] = len(param) & 0xFF ptr += 1 for j, par in enumerate(param): - self._sendbuf[ptr+j] = par + self._sendbuf[ptr + j] = par ptr += len(param) self._sendbuf[ptr] = _END_CMD self._wait_for_ready() with self._spi_device as spi: times = time.monotonic() - while (time.monotonic() - times) < 1: # wait up to 1000ms + while (time.monotonic() - times) < 1: # wait up to 1000ms if self._ready.value: # ok ready to send! break else: raise RuntimeError("ESP32 timed out on SPI select") - spi.write(self._sendbuf, start=0, end=packet_len) # pylint: disable=no-member + spi.write( + self._sendbuf, start=0, end=packet_len + ) # pylint: disable=no-member if self._debug >= 3: print("Wrote: ", [hex(b) for b in self._sendbuf[0:packet_len]]) + # pylint: disable=too-many-branches def _read_byte(self, spi): @@ -290,7 +298,7 @@ def _wait_response_cmd(self, cmd, num_responses=None, *, param_len_16=False): responses = [] with self._spi_device as spi: times = time.monotonic() - while (time.monotonic() - times) < 1: # wait up to 1000ms + while (time.monotonic() - times) < 1: # wait up to 1000ms if self._ready.value: # ok ready to send! break else: @@ -318,12 +326,20 @@ def _wait_response_cmd(self, cmd, num_responses=None, *, param_len_16=False): print("Read %d: " % len(responses[0]), responses) return responses - def _send_command_get_response(self, cmd, params=None, *, - reply_params=1, sent_param_len_16=False, - recv_param_len_16=False): + def _send_command_get_response( + self, + cmd, + params=None, + *, + reply_params=1, + sent_param_len_16=False, + recv_param_len_16=False + ): """Send a high level SPI command, wait and return the response""" self._send_command(cmd, params, param_len_16=sent_param_len_16) - return self._wait_response_cmd(cmd, reply_params, param_len_16=recv_param_len_16) + return self._wait_response_cmd( + cmd, reply_params, param_len_16=recv_param_len_16 + ) @property def status(self): @@ -336,7 +352,7 @@ def status(self): resp = self._send_command_get_response(_GET_CONN_STATUS_CMD) if self._debug: print("Conn status:", resp[0][0]) - return resp[0][0] # one byte response + return resp[0][0] # one byte response @property def firmware_version(self): @@ -347,19 +363,19 @@ def firmware_version(self): return resp[0] @property - def MAC_address(self): # pylint: disable=invalid-name + def MAC_address(self): # pylint: disable=invalid-name """A bytearray containing the MAC address of the ESP32""" if self._debug: print("MAC address") - resp = self._send_command_get_response(_GET_MACADDR_CMD, [b'\xFF']) + resp = self._send_command_get_response(_GET_MACADDR_CMD, [b"\xFF"]) return resp[0] @property - def MAC_address_actual(self): # pylint: disable=invalid-name + def MAC_address_actual(self): # pylint: disable=invalid-name """A bytearray containing the actual MAC address of the ESP32""" if self._debug: print("MAC address") - resp = self._send_command_get_response(_GET_MACADDR_CMD, [b'\xFF']) + resp = self._send_command_get_response(_GET_MACADDR_CMD, [b"\xFF"]) new_resp = bytearray(resp[0]) new_resp = reversed(new_resp) return new_resp @@ -378,18 +394,18 @@ def get_scan_networks(self): 'ssid', 'rssi', 'encryption', bssid, and channel entries, one for each AP found""" self._send_command(_SCAN_NETWORKS) names = self._wait_response_cmd(_SCAN_NETWORKS) - #print("SSID names:", names) - APs = [] # pylint: disable=invalid-name + # print("SSID names:", names) + APs = [] # pylint: disable=invalid-name for i, name in enumerate(names): - a_p = {'ssid': name} + a_p = {"ssid": name} rssi = self._send_command_get_response(_GET_IDX_RSSI_CMD, ((i,),))[0] - a_p['rssi'] = struct.unpack('H', port) - if isinstance(dest, str): # use the 5 arg version - dest = bytes(dest, 'utf-8') - resp = self._send_command_get_response(_START_CLIENT_TCP_CMD, - (dest, b'\x00\x00\x00\x00', - port_param, - self._socknum_ll[0], - (conn_mode,))) - else: # ip address, use 4 arg vesion - resp = self._send_command_get_response(_START_CLIENT_TCP_CMD, - (dest, port_param, - self._socknum_ll[0], - (conn_mode,))) + port_param = struct.pack(">H", port) + if isinstance(dest, str): # use the 5 arg version + dest = bytes(dest, "utf-8") + resp = self._send_command_get_response( + _START_CLIENT_TCP_CMD, + ( + dest, + b"\x00\x00\x00\x00", + port_param, + self._socknum_ll[0], + (conn_mode,), + ), + ) + else: # ip address, use 4 arg vesion + resp = self._send_command_get_response( + _START_CLIENT_TCP_CMD, + (dest, port_param, self._socknum_ll[0], (conn_mode,)), + ) if resp[0][0] != 1: raise RuntimeError("Could not connect to remote server") @@ -653,7 +680,9 @@ def socket_status(self, socket_num): SOCKET_FIN_WAIT_2, SOCKET_CLOSE_WAIT, SOCKET_CLOSING, SOCKET_LAST_ACK, or SOCKET_TIME_WAIT""" self._socknum_ll[0][0] = socket_num - resp = self._send_command_get_response(_GET_CLIENT_STATE_TCP_CMD, self._socknum_ll) + resp = self._send_command_get_response( + _GET_CLIENT_STATE_TCP_CMD, self._socknum_ll + ) return resp[0][0] def socket_connected(self, socket_num): @@ -666,15 +695,21 @@ def socket_write(self, socket_num, buffer): print("Writing:", buffer) self._socknum_ll[0][0] = socket_num sent = 0 - for chunk in range((len(buffer) // 64)+1): - resp = self._send_command_get_response(_SEND_DATA_TCP_CMD, - (self._socknum_ll[0], - memoryview(buffer)[(chunk*64):((chunk+1)*64)]), - sent_param_len_16=True) + for chunk in range((len(buffer) // 64) + 1): + resp = self._send_command_get_response( + _SEND_DATA_TCP_CMD, + ( + self._socknum_ll[0], + memoryview(buffer)[(chunk * 64) : ((chunk + 1) * 64)], + ), + sent_param_len_16=True, + ) sent += resp[0][0] if sent != len(buffer): - raise RuntimeError("Failed to send %d bytes (sent %d)" % (len(buffer), sent)) + raise RuntimeError( + "Failed to send %d bytes (sent %d)" % (len(buffer), sent) + ) resp = self._send_command_get_response(_DATA_SENT_TCP_CMD, self._socknum_ll) if resp[0][0] != 1: @@ -684,7 +719,7 @@ def socket_available(self, socket_num): """Determine how many bytes are waiting to be read on the socket""" self._socknum_ll[0][0] = socket_num resp = self._send_command_get_response(_AVAIL_DATA_TCP_CMD, self._socknum_ll) - reply = struct.unpack('> 8) & 0xFF)), - sent_param_len_16=True, - recv_param_len_16=True) + resp = self._send_command_get_response( + _GET_DATABUF_TCP_CMD, + (self._socknum_ll[0], (size & 0xFF, (size >> 8) & 0xFF)), + sent_param_len_16=True, + recv_param_len_16=True, + ) return bytes(resp[0]) def socket_connect(self, socket_num, dest, port, conn_mode=TCP_MODE): @@ -727,12 +765,14 @@ def socket_close(self, socket_num): if resp[0][0] != 1: raise RuntimeError("Failed to close socket") - def start_server(self, port, socket_num, conn_mode=TCP_MODE, ip=None): # pylint: disable=invalid-name + def start_server( + self, port, socket_num, conn_mode=TCP_MODE, ip=None + ): # pylint: disable=invalid-name """Opens a server on the specified port, using the ESP32's internal reference number""" if self._debug: print("*** starting server") self._socknum_ll[0][0] = socket_num - params = [struct.pack('>H', port), self._socknum_ll[0], (conn_mode,)] + params = [struct.pack(">H", port), self._socknum_ll[0], (conn_mode,)] if ip: params.insert(0, ip) resp = self._send_command_get_response(_START_SERVER_TCP_CMD, params) @@ -766,8 +806,7 @@ def set_pin_mode(self, pin, mode): pin_mode = 0 else: pin_mode = mode - resp = self._send_command_get_response(_SET_PIN_MODE_CMD, - ((pin,), (pin_mode,))) + resp = self._send_command_get_response(_SET_PIN_MODE_CMD, ((pin,), (pin_mode,))) if resp[0][0] != 1: raise RuntimeError("Failed to set pin mode") @@ -778,8 +817,9 @@ def set_digital_write(self, pin, value): :param int pin: ESP32 GPIO pin to write to. :param bool value: Value for the pin. """ - resp = self._send_command_get_response(_SET_DIGITAL_WRITE_CMD, - ((pin,), (value,))) + resp = self._send_command_get_response( + _SET_DIGITAL_WRITE_CMD, ((pin,), (value,)) + ) if resp[0][0] != 1: raise RuntimeError("Failed to write to pin") @@ -791,8 +831,9 @@ def set_analog_write(self, pin, analog_value): :param float value: 0=off 1.0=full on """ value = int(255 * analog_value) - resp = self._send_command_get_response(_SET_ANALOG_WRITE_CMD, - ((pin,), (value,))) + resp = self._send_command_get_response( + _SET_ANALOG_WRITE_CMD, ((pin,), (value,)) + ) if resp[0][0] != 1: raise RuntimeError("Failed to write to pin") @@ -806,14 +847,14 @@ def set_digital_read(self, pin): fw_semver_maj = bytes(self.firmware_version).decode("utf-8")[2] assert int(fw_semver_maj) >= 5, "Please update nina-fw to 1.5.0 or above." - resp = self._send_command_get_response(_SET_DIGITAL_READ_CMD, - ((pin,),))[0] + resp = self._send_command_get_response(_SET_DIGITAL_READ_CMD, ((pin,),))[0] if resp[0] == 0: return False - elif resp[0] == 1: + if resp[0] == 1: return True - else: - raise ValueError("_SET_DIGITAL_READ response error: response is not boolean", resp[0]) + raise ValueError( + "_SET_DIGITAL_READ response error: response is not boolean", resp[0] + ) def set_analog_read(self, pin, atten=ADC_ATTEN_DB_11): """ @@ -826,11 +867,12 @@ def set_analog_read(self, pin, atten=ADC_ATTEN_DB_11): fw_semver_maj = bytes(self.firmware_version).decode("utf-8")[2] assert int(fw_semver_maj) >= 5, "Please update nina-fw to 1.5.0 or above." - resp = self._send_command_get_response(_SET_ANALOG_READ_CMD, - ((pin,), (atten,))) - resp_analog = struct.unpack(' 0 and time.monotonic() - stamp > self._timeout: self.close() # Make sure to close socket so that we don't exhaust sockets. raise RuntimeError("Didn't receive full response, failing out") - firstline, self._buffer = self._buffer.split(b'\r\n', 1) + firstline, self._buffer = self._buffer.split(b"\r\n", 1) gc.collect() return firstline @@ -115,8 +126,8 @@ def recv(self, bufsize=0): """Reads some bytes from the connected remote address. :param int bufsize: maximum number of bytes to receive """ - #print("Socket read", bufsize) - if bufsize == 0: # read as much as we can at the moment + # print("Socket read", bufsize) + if bufsize == 0: # read as much as we can at the moment while True: avail = self.available() if avail: @@ -125,7 +136,7 @@ def recv(self, bufsize=0): break gc.collect() ret = self._buffer - self._buffer = b'' + self._buffer = b"" gc.collect() return ret stamp = time.monotonic() @@ -133,7 +144,7 @@ def recv(self, bufsize=0): to_read = bufsize - len(self._buffer) received = [] while to_read > 0: - #print("Bytes to read:", to_read) + # print("Bytes to read:", to_read) avail = self.available() if avail: stamp = time.monotonic() @@ -143,13 +154,13 @@ def recv(self, bufsize=0): gc.collect() if self._timeout > 0 and time.monotonic() - stamp > self._timeout: break - #print(received) - self._buffer += b''.join(received) + # print(received) + self._buffer += b"".join(received) ret = None if len(self._buffer) == bufsize: ret = self._buffer - self._buffer = b'' + self._buffer = b"" else: ret = self._buffer[:bufsize] self._buffer = self._buffer[bufsize:] @@ -177,22 +188,23 @@ def connected(self): """Whether or not we are connected to the socket""" if self.socknum == NO_SOCKET_AVAIL: return False - elif self.available(): + if self.available(): return True - else: - status = _the_interface.socket_status(self.socknum) - result = status not in (adafruit_esp32spi.SOCKET_LISTEN, - adafruit_esp32spi.SOCKET_CLOSED, - adafruit_esp32spi.SOCKET_FIN_WAIT_1, - adafruit_esp32spi.SOCKET_FIN_WAIT_2, - adafruit_esp32spi.SOCKET_TIME_WAIT, - adafruit_esp32spi.SOCKET_SYN_SENT, - adafruit_esp32spi.SOCKET_SYN_RCVD, - adafruit_esp32spi.SOCKET_CLOSE_WAIT) - if not result: - self.close() - self._socknum = NO_SOCKET_AVAIL - return result + status = _the_interface.socket_status(self.socknum) + result = status not in ( + adafruit_esp32spi.SOCKET_LISTEN, + adafruit_esp32spi.SOCKET_CLOSED, + adafruit_esp32spi.SOCKET_FIN_WAIT_1, + adafruit_esp32spi.SOCKET_FIN_WAIT_2, + adafruit_esp32spi.SOCKET_TIME_WAIT, + adafruit_esp32spi.SOCKET_SYN_SENT, + adafruit_esp32spi.SOCKET_SYN_RCVD, + adafruit_esp32spi.SOCKET_CLOSE_WAIT, + ) + if not result: + self.close() + self._socknum = NO_SOCKET_AVAIL + return result @property def socknum(self): @@ -202,4 +214,6 @@ def socknum(self): def close(self): """Close the socket, after reading whatever remains""" _the_interface.socket_close(self._socknum) + + # pylint: enable=unused-argument, redefined-builtin, invalid-name diff --git a/adafruit_esp32spi/adafruit_esp32spi_wifimanager.py b/adafruit_esp32spi/adafruit_esp32spi_wifimanager.py index 6505863..1506a6e 100755 --- a/adafruit_esp32spi/adafruit_esp32spi_wifimanager.py +++ b/adafruit_esp32spi/adafruit_esp32spi_wifimanager.py @@ -42,12 +42,20 @@ class ESPSPI_WiFiManager: """ A class to help manage the Wifi connection """ + NORMAL = const(1) ENTERPRISE = const(2) # pylint: disable=too-many-arguments - def __init__(self, esp, secrets, status_pixel=None, attempts=2, - connection_type=NORMAL, debug=False): + def __init__( + self, + esp, + secrets, + status_pixel=None, + attempts=2, + connection_type=NORMAL, + debug=False, + ): """ :param ESP_SPIcontrol esp: The ESP object we are using :param dict secrets: The WiFi and Adafruit IO secrets dict (See examples) @@ -60,8 +68,8 @@ def __init__(self, esp, secrets, status_pixel=None, attempts=2, # Read the settings self.esp = esp self.debug = debug - self.ssid = secrets['ssid'] - self.password = secrets.get('password', None) + self.ssid = secrets["ssid"] + self.password = secrets.get("password", None) self.attempts = attempts self._connection_type = connection_type requests.set_socket(socket, esp) @@ -69,18 +77,19 @@ def __init__(self, esp, secrets, status_pixel=None, attempts=2, self.pixel_status(0) # Check for WPA2 Enterprise keys in the secrets dictionary and load them if they exist - if secrets.get('ent_ssid'): - self.ent_ssid = secrets['ent_ssid'] + if secrets.get("ent_ssid"): + self.ent_ssid = secrets["ent_ssid"] else: - self.ent_ssid = secrets['ssid'] - if secrets.get('ent_ident'): - self.ent_ident = secrets['ent_ident'] + self.ent_ssid = secrets["ssid"] + if secrets.get("ent_ident"): + self.ent_ident = secrets["ent_ident"] else: - self.ent_ident = '' - if secrets.get('ent_user'): - self.ent_user = secrets['ent_user'] - if secrets.get('ent_password'): - self.ent_password = secrets['ent_password'] + self.ent_ident = "" + if secrets.get("ent_user"): + self.ent_user = secrets["ent_user"] + if secrets.get("ent_password"): + self.ent_password = secrets["ent_password"] + # pylint: enable=too-many-arguments def reset(self): @@ -101,7 +110,10 @@ def connect(self): print("Firmware vers.", self.esp.firmware_version) print("MAC addr:", [hex(i) for i in self.esp.MAC_address]) for access_pt in self.esp.scan_networks(): - print("\t%s\t\tRSSI: %d" % (str(access_pt['ssid'], 'utf-8'), access_pt['rssi'])) + print( + "\t%s\t\tRSSI: %d" + % (str(access_pt["ssid"], "utf-8"), access_pt["rssi"]) + ) if self._connection_type == ESPSPI_WiFiManager.NORMAL: self.connect_normal() elif self._connection_type == ESPSPI_WiFiManager.ENTERPRISE: @@ -119,7 +131,9 @@ def connect_normal(self): if self.debug: print("Connecting to AP...") self.pixel_status((100, 0, 0)) - self.esp.connect_AP(bytes(self.ssid, 'utf-8'), bytes(self.password, 'utf-8')) + self.esp.connect_AP( + bytes(self.ssid, "utf-8"), bytes(self.password, "utf-8") + ) failure_count = 0 self.pixel_status((0, 100, 0)) except (ValueError, RuntimeError) as error: @@ -143,9 +157,11 @@ def create_ap(self): print("Waiting for AP to be initialized...") self.pixel_status((100, 0, 0)) if self.password: - self.esp.create_AP(bytes(self.ssid, 'utf-8'), bytes(self.password, 'utf-8')) + self.esp.create_AP( + bytes(self.ssid, "utf-8"), bytes(self.password, "utf-8") + ) else: - self.esp.create_AP(bytes(self.ssid, 'utf-8'), None) + self.esp.create_AP(bytes(self.ssid, "utf-8"), None) failure_count = 0 self.pixel_status((0, 100, 0)) except (ValueError, RuntimeError) as error: @@ -162,15 +178,17 @@ def connect_enterprise(self): Attempt an enterprise style WiFi connection """ failure_count = 0 - self.esp.wifi_set_network(bytes(self.ent_ssid, 'utf-8')) - self.esp.wifi_set_entidentity(bytes(self.ent_ident, 'utf-8')) - self.esp.wifi_set_entusername(bytes(self.ent_user, 'utf-8')) - self.esp.wifi_set_entpassword(bytes(self.ent_password, 'utf-8')) + self.esp.wifi_set_network(bytes(self.ent_ssid, "utf-8")) + self.esp.wifi_set_entidentity(bytes(self.ent_ident, "utf-8")) + self.esp.wifi_set_entusername(bytes(self.ent_user, "utf-8")) + self.esp.wifi_set_entpassword(bytes(self.ent_password, "utf-8")) self.esp.wifi_set_entenable() while not self.esp.is_connected: try: if self.debug: - print("Waiting for the ESP32 to connect to the WPA2 Enterprise AP...") + print( + "Waiting for the ESP32 to connect to the WPA2 Enterprise AP..." + ) self.pixel_status((100, 0, 0)) sleep(1) failure_count = 0 @@ -312,7 +330,7 @@ def pixel_status(self, value): :type value: int or 3-value tuple """ if self.statuspix: - if hasattr(self.statuspix, 'color'): + if hasattr(self.statuspix, "color"): self.statuspix.color = value else: self.statuspix.fill(value) diff --git a/adafruit_esp32spi/adafruit_esp32spi_wsgiserver.py b/adafruit_esp32spi/adafruit_esp32spi_wsgiserver.py index eb1b906..9218453 100644 --- a/adafruit_esp32spi/adafruit_esp32spi_wsgiserver.py +++ b/adafruit_esp32spi/adafruit_esp32spi_wsgiserver.py @@ -52,13 +52,16 @@ import adafruit_esp32spi.adafruit_esp32spi_socket as socket from adafruit_requests import parse_headers -_the_interface = None # pylint: disable=invalid-name +_the_interface = None # pylint: disable=invalid-name + + def set_interface(iface): """Helper to set the global internet interface""" - global _the_interface # pylint: disable=global-statement, invalid-name + global _the_interface # pylint: disable=global-statement, invalid-name _the_interface = iface socket.set_interface(iface) + NO_SOCK_AVAIL = const(255) # pylint: disable=invalid-name @@ -88,7 +91,10 @@ def start(self): if self._debug: ip = _the_interface.pretty_ip(_the_interface.ip_address) print("Server available at {0}:{1}".format(ip, self.port)) - print("Sever status: ", _the_interface.get_server_state(self._server_sock.socknum)) + print( + "Sever status: ", + _the_interface.get_server_state(self._server_sock.socknum), + ) def update_poll(self): """ @@ -97,7 +103,7 @@ def update_poll(self): the application callable will be invoked. """ self.client_available() - if (self._client_sock and self._client_sock.available()): + if self._client_sock and self._client_sock.available(): environ = self._get_environ(self._client_sock) result = self.application(environ, self._start_response) self.finish_response(result) @@ -145,7 +151,9 @@ def client_available(self): # check for new client sock if self._debug > 2: print("checking for new client sock") - client_sock_num = _the_interface.socket_available(self._server_sock.socknum) + client_sock_num = _the_interface.socket_available( + self._server_sock.socknum + ) sock = socket.socket(socknum=client_sock_num) else: print("Server has not been started, cannot check for clients!") @@ -210,7 +218,7 @@ def _get_environ(self, client): body = client.read() env["wsgi.input"] = io.StringIO(body) for name, value in headers.items(): - key = "HTTP_" + name.replace('-', '_').upper() + key = "HTTP_" + name.replace("-", "_").upper() if key in env: value = "{0},{1}".format(env[key], value) env[key] = value diff --git a/adafruit_esp32spi/digitalio.py b/adafruit_esp32spi/digitalio.py index 49f5c35..48fcca3 100755 --- a/adafruit_esp32spi/digitalio.py +++ b/adafruit_esp32spi/digitalio.py @@ -31,6 +31,7 @@ """ from micropython import const + class Pin: """ Implementation of CircuitPython API Pin Handling @@ -42,7 +43,8 @@ class Pin: NOTE: This class does not currently implement reading digital pins or the use of internal pull-up resistors. """ - #pylint: disable=invalid-name + + # pylint: disable=invalid-name IN = const(0x00) OUT = const(0x01) LOW = const(0x00) @@ -51,17 +53,15 @@ class Pin: _mode = IN pin_id = None - ESP32_GPIO_PINS = set([0, 1, 2, 4, 5, - 12, 13, 14, 15, - 16, 17, 18, 19, - 21, 22, 23, 25, - 26, 27, 32, 33]) + ESP32_GPIO_PINS = set( + [0, 1, 2, 4, 5, 12, 13, 14, 15, 16, 17, 18, 19, 21, 22, 23, 25, 26, 27, 32, 33] + ) def __init__(self, esp_pin, esp): if esp_pin in self.ESP32_GPIO_PINS: self.pin_id = esp_pin else: - raise AttributeError("Pin %d is not a valid ESP32 GPIO Pin."%esp_pin) + raise AttributeError("Pin %d is not a valid ESP32 GPIO Pin." % esp_pin) self._esp = esp def init(self, mode=IN): @@ -92,36 +92,46 @@ def value(self, val=None): else: raise RuntimeError("Invalid value for pin") else: - raise NotImplementedError("digitalRead not currently implemented in esp32spi") + raise NotImplementedError( + "digitalRead not currently implemented in esp32spi" + ) def __repr__(self): return str(self.pin_id) + # pylint: disable = too-few-public-methods -class DriveMode(): +class DriveMode: """DriveMode Enum.""" + PUSH_PULL = None OPEN_DRAIN = None + + DriveMode.PUSH_PULL = DriveMode() DriveMode.OPEN_DRAIN = DriveMode() -class Direction(): +class Direction: """DriveMode Enum.""" + INPUT = None OUTPUT = None + + Direction.INPUT = Direction() Direction.OUTPUT = Direction() -class DigitalInOut(): +class DigitalInOut: """Implementation of DigitalIO module for ESP32SPI. :param ESP_SPIcontrol esp: The ESP object we are using. :param int pin: Valid ESP32 GPIO Pin, predefined in ESP32_GPIO_PINS. """ + _pin = None - #pylint: disable = attribute-defined-outside-init + # pylint: disable = attribute-defined-outside-init def __init__(self, esp, pin): self._esp = esp self._pin = Pin(pin, self._esp) @@ -150,7 +160,9 @@ def switch_to_input(self, pull=None): """Sets the pull and then switch to read in digital values. :param Pull pull: Pull configuration for the input. """ - raise NotImplementedError("Digital reads are not currently supported in ESP32SPI.") + raise NotImplementedError( + "Digital reads are not currently supported in ESP32SPI." + ) @property def direction(self): @@ -175,7 +187,7 @@ def direction(self, pin_dir): @property def value(self): """Returns the digital logic level value of the pin.""" - return self._pin.value() is 1 + return self._pin.value() == 1 @value.setter def value(self, val): @@ -204,6 +216,8 @@ def drive_mode(self, mode): """ self.__drive_mode = mode if mode is DriveMode.OPEN_DRAIN: - raise NotImplementedError('Drive mode %s not implemented in ESP32SPI.'%mode) - elif mode is DriveMode.PUSH_PULL: + raise NotImplementedError( + "Drive mode %s not implemented in ESP32SPI." % mode + ) + if mode is DriveMode.PUSH_PULL: self._pin.init(mode=Pin.OUT) diff --git a/docs/conf.py b/docs/conf.py index e97e6da..8ce589c 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -2,7 +2,8 @@ import os import sys -sys.path.insert(0, os.path.abspath('..')) + +sys.path.insert(0, os.path.abspath("..")) # -- General configuration ------------------------------------------------ @@ -10,10 +11,10 @@ # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. extensions = [ - 'sphinx.ext.autodoc', - 'sphinx.ext.intersphinx', - 'sphinx.ext.napoleon', - 'sphinx.ext.todo', + "sphinx.ext.autodoc", + "sphinx.ext.intersphinx", + "sphinx.ext.napoleon", + "sphinx.ext.todo", ] # TODO: Please Read! @@ -23,29 +24,36 @@ # autodoc_mock_imports = ["digitalio", "busio"] -intersphinx_mapping = {'python': ('https://docs.python.org/3.4', None),'BusDevice': ('https://circuitpython.readthedocs.io/projects/busdevice/en/latest/', None),'CircuitPython': ('https://circuitpython.readthedocs.io/en/latest/', None)} +intersphinx_mapping = { + "python": ("https://docs.python.org/3.4", None), + "BusDevice": ( + "https://circuitpython.readthedocs.io/projects/busdevice/en/latest/", + None, + ), + "CircuitPython": ("https://circuitpython.readthedocs.io/en/latest/", None), +} # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +templates_path = ["_templates"] -source_suffix = '.rst' +source_suffix = ".rst" # The master toctree document. -master_doc = 'index' +master_doc = "index" # General information about the project. -project = u'Adafruit ESP32SPI Library' -copyright = u'2019 ladyada' -author = u'ladyada' +project = u"Adafruit ESP32SPI Library" +copyright = u"2019 ladyada" +author = u"ladyada" # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the # built documents. # # The short X.Y version. -version = u'1.0' +version = u"1.0" # The full version, including alpha/beta/rc tags. -release = u'1.0' +release = u"1.0" # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. @@ -57,7 +65,7 @@ # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. # This patterns also effect to html_static_path and html_extra_path -exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store', '.env', 'CODE_OF_CONDUCT.md'] +exclude_patterns = ["_build", "Thumbs.db", ".DS_Store", ".env", "CODE_OF_CONDUCT.md"] # The reST default role (used for this markup: `text`) to use for all # documents. @@ -69,7 +77,7 @@ add_function_parentheses = True # The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' +pygments_style = "sphinx" # If true, `todo` and `todoList` produce output, else they produce nothing. todo_include_todos = False @@ -84,59 +92,62 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. # -on_rtd = os.environ.get('READTHEDOCS', None) == 'True' +on_rtd = os.environ.get("READTHEDOCS", None) == "True" if not on_rtd: # only import and set the theme if we're building docs locally try: import sphinx_rtd_theme - html_theme = 'sphinx_rtd_theme' - html_theme_path = [sphinx_rtd_theme.get_html_theme_path(), '.'] + + html_theme = "sphinx_rtd_theme" + html_theme_path = [sphinx_rtd_theme.get_html_theme_path(), "."] except: - html_theme = 'default' - html_theme_path = ['.'] + html_theme = "default" + html_theme_path = ["."] else: - html_theme_path = ['.'] + html_theme_path = ["."] # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +html_static_path = ["_static"] # The name of an image file (relative to this directory) to use as a favicon of # the docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32 # pixels large. # -html_favicon = '_static/favicon.ico' +html_favicon = "_static/favicon.ico" # Output file base name for HTML help builder. -htmlhelp_basename = 'AdafruitEsp32spiLibrarydoc' +htmlhelp_basename = "AdafruitEsp32spiLibrarydoc" # -- Options for LaTeX output --------------------------------------------- latex_elements = { - # The paper size ('letterpaper' or 'a4paper'). - # - # 'papersize': 'letterpaper', - - # The font size ('10pt', '11pt' or '12pt'). - # - # 'pointsize': '10pt', - - # Additional stuff for the LaTeX preamble. - # - # 'preamble': '', - - # Latex figure (float) alignment - # - # 'figure_align': 'htbp', + # The paper size ('letterpaper' or 'a4paper'). + # + # 'papersize': 'letterpaper', + # The font size ('10pt', '11pt' or '12pt'). + # + # 'pointsize': '10pt', + # Additional stuff for the LaTeX preamble. + # + # 'preamble': '', + # Latex figure (float) alignment + # + # 'figure_align': 'htbp', } # Grouping the document tree into LaTeX files. List of tuples # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - (master_doc, 'AdafruitESP32SPILibrary.tex', u'AdafruitESP32SPI Library Documentation', - author, 'manual'), + ( + master_doc, + "AdafruitESP32SPILibrary.tex", + u"AdafruitESP32SPI Library Documentation", + author, + "manual", + ), ] # -- Options for manual page output --------------------------------------- @@ -144,8 +155,13 @@ # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). man_pages = [ - (master_doc, 'AdafruitESP32SPIlibrary', u'Adafruit ESP32SPI Library Documentation', - [author], 1) + ( + master_doc, + "AdafruitESP32SPIlibrary", + u"Adafruit ESP32SPI Library Documentation", + [author], + 1, + ) ] # -- Options for Texinfo output ------------------------------------------- @@ -154,7 +170,13 @@ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - (master_doc, 'AdafruitESP32SPILibrary', u'Adafruit ESP32SPI Library Documentation', - author, 'AdafruitESP32SPILibrary', 'One line description of project.', - 'Miscellaneous'), + ( + master_doc, + "AdafruitESP32SPILibrary", + u"Adafruit ESP32SPI Library Documentation", + author, + "AdafruitESP32SPILibrary", + "One line description of project.", + "Miscellaneous", + ), ] diff --git a/examples/esp32spi_aio_post.py b/examples/esp32spi_aio_post.py index dfbf194..d676f31 100644 --- a/examples/esp32spi_aio_post.py +++ b/examples/esp32spi_aio_post.py @@ -28,7 +28,9 @@ spi = busio.SPI(board.SCK, board.MOSI, board.MISO) esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset) """Use below for Most Boards""" -status_light = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2) # Uncomment for Most Boards +status_light = neopixel.NeoPixel( + board.NEOPIXEL, 1, brightness=0.2 +) # Uncomment for Most Boards """Uncomment below for ItsyBitsy M4""" # status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2) # Uncomment below for an externally defined RGB LED @@ -44,14 +46,19 @@ while True: try: - print("Posting data...", end='') + print("Posting data...", end="") data = counter - feed = 'test' - payload = {'value':data} + feed = "test" + payload = {"value": data} response = wifi.post( - "https://io.adafruit.com/api/v2/"+secrets['aio_username']+"/feeds/"+feed+"/data", + "https://io.adafruit.com/api/v2/" + + secrets["aio_username"] + + "/feeds/" + + feed + + "/data", json=payload, - headers={"X-AIO-KEY":secrets['aio_key']}) + headers={"X-AIO-KEY": secrets["aio_key"]}, + ) print(response.json()) response.close() counter = counter + 1 diff --git a/examples/esp32spi_cheerlights.py b/examples/esp32spi_cheerlights.py index 51f8f2d..78a131c 100644 --- a/examples/esp32spi_cheerlights.py +++ b/examples/esp32spi_cheerlights.py @@ -27,9 +27,11 @@ spi = busio.SPI(board.SCK, board.MOSI, board.MISO) esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset) """Use below for Most Boards""" -status_light = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2) # Uncomment for Most Boards +status_light = neopixel.NeoPixel( + board.NEOPIXEL, 1, brightness=0.2 +) # Uncomment for Most Boards """Uncomment below for ItsyBitsy M4""" -#status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2) +# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2) wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light) # neopixels @@ -60,7 +62,7 @@ color = int(value[1:], 16) red = color >> 16 & 0xFF green = color >> 8 & 0xFF - blue = color& 0xFF + blue = color & 0xFF gamma_corrected = fancy.gamma_adjust(fancy.CRGB(red, green, blue)).pack() pixels.fill(gamma_corrected) diff --git a/examples/esp32spi_localtime.py b/examples/esp32spi_localtime.py index f0d408e..b2c0015 100644 --- a/examples/esp32spi_localtime.py +++ b/examples/esp32spi_localtime.py @@ -24,9 +24,11 @@ spi = busio.SPI(board.SCK, board.MOSI, board.MISO) esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset) """Use below for Most Boards""" -status_light = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2) # Uncomment for Most Boards +status_light = neopixel.NeoPixel( + board.NEOPIXEL, 1, brightness=0.2 +) # Uncomment for Most Boards """Uncomment below for ItsyBitsy M4""" -#status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2) +# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2) wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light) the_rtc = rtc.RTC() @@ -42,18 +44,20 @@ continue json = response.json() -current_time = json['datetime'] -the_date, the_time = current_time.split('T') -year, month, mday = [int(x) for x in the_date.split('-')] -the_time = the_time.split('.')[0] -hours, minutes, seconds = [int(x) for x in the_time.split(':')] +current_time = json["datetime"] +the_date, the_time = current_time.split("T") +year, month, mday = [int(x) for x in the_date.split("-")] +the_time = the_time.split(".")[0] +hours, minutes, seconds = [int(x) for x in the_time.split(":")] # We can also fill in these extra nice things -year_day = json['day_of_year'] -week_day = json['day_of_week'] -is_dst = json['dst'] +year_day = json["day_of_year"] +week_day = json["day_of_week"] +is_dst = json["dst"] -now = time.struct_time((year, month, mday, hours, minutes, seconds, week_day, year_day, is_dst)) +now = time.struct_time( + (year, month, mday, hours, minutes, seconds, week_day, year_day, is_dst) +) print(now) the_rtc.datetime = now diff --git a/examples/esp32spi_secrets.py b/examples/esp32spi_secrets.py index 3c22224..6a73799 100644 --- a/examples/esp32spi_secrets.py +++ b/examples/esp32spi_secrets.py @@ -2,9 +2,9 @@ # If you put them in the code you risk committing that info or sharing it secrets = { - 'ssid' : 'yourssid', - 'password' : 'yourpassword', - 'timezone' : "America/New_York", # Check http://worldtimeapi.org/timezones - 'aio_username' : 'youraiousername', - 'aio_key' : 'youraiokey', + "ssid": "yourssid", + "password": "yourpassword", + "timezone": "America/New_York", # Check http://worldtimeapi.org/timezones + "aio_username": "youraiousername", + "aio_key": "youraiokey", } diff --git a/examples/esp32spi_simpletest.py b/examples/esp32spi_simpletest.py index ccefb1c..fcf656b 100644 --- a/examples/esp32spi_simpletest.py +++ b/examples/esp32spi_simpletest.py @@ -37,34 +37,36 @@ print("MAC addr:", [hex(i) for i in esp.MAC_address]) for ap in esp.scan_networks(): - print("\t%s\t\tRSSI: %d" % (str(ap['ssid'], 'utf-8'), ap['rssi'])) + print("\t%s\t\tRSSI: %d" % (str(ap["ssid"], "utf-8"), ap["rssi"])) print("Connecting to AP...") while not esp.is_connected: try: - esp.connect_AP(b'MY_SSID_NAME', b'MY_SSID_PASSWORD') + esp.connect_AP(b"MY_SSID_NAME", b"MY_SSID_PASSWORD") except RuntimeError as e: - print("could not connect to AP, retrying: ",e) + print("could not connect to AP, retrying: ", e) continue -print("Connected to", str(esp.ssid, 'utf-8'), "\tRSSI:", esp.rssi) +print("Connected to", str(esp.ssid, "utf-8"), "\tRSSI:", esp.rssi) print("My IP address is", esp.pretty_ip(esp.ip_address)) -print("IP lookup adafruit.com: %s" % esp.pretty_ip(esp.get_host_by_name("adafruit.com"))) +print( + "IP lookup adafruit.com: %s" % esp.pretty_ip(esp.get_host_by_name("adafruit.com")) +) print("Ping google.com: %d ms" % esp.ping("google.com")) -#esp._debug = True +# esp._debug = True print("Fetching text from", TEXT_URL) r = requests.get(TEXT_URL) -print('-'*40) +print("-" * 40) print(r.text) -print('-'*40) +print("-" * 40) r.close() print() print("Fetching json from", JSON_URL) r = requests.get(JSON_URL) -print('-'*40) +print("-" * 40) print(r.json()) -print('-'*40) +print("-" * 40) r.close() print("Done!") diff --git a/examples/esp32spi_wpa2ent_aio_post.py b/examples/esp32spi_wpa2ent_aio_post.py index 2123c05..18a6847 100644 --- a/examples/esp32spi_wpa2ent_aio_post.py +++ b/examples/esp32spi_wpa2ent_aio_post.py @@ -30,23 +30,32 @@ spi = busio.SPI(board.SCK, board.MOSI, board.MISO) esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset) """Use below for Most Boards""" -status_light = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2) # Uncomment for Most Boards +status_light = neopixel.NeoPixel( + board.NEOPIXEL, 1, brightness=0.2 +) # Uncomment for Most Boards """Uncomment below for ItsyBitsy M4""" -#status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2) -wifi = ESPSPI_WiFiManager(esp, secrets, status_light, connection_type=ESPSPI_WiFiManager.ENTERPRISE) +# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2) +wifi = ESPSPI_WiFiManager( + esp, secrets, status_light, connection_type=ESPSPI_WiFiManager.ENTERPRISE +) counter = 0 while True: try: - print("Posting data...", end='') + print("Posting data...", end="") data = counter - feed = 'test' - payload = {'value':data} + feed = "test" + payload = {"value": data} response = wifi.post( - "https://io.adafruit.com/api/v2/"+secrets['aio_username']+"/feeds/"+feed+"/data", + "https://io.adafruit.com/api/v2/" + + secrets["aio_username"] + + "/feeds/" + + feed + + "/data", json=payload, - headers={"X-AIO-KEY":secrets['aio_key']}) + headers={"X-AIO-KEY": secrets["aio_key"]}, + ) print(response.json()) response.close() counter = counter + 1 diff --git a/examples/esp32spi_wpa2ent_simpletest.py b/examples/esp32spi_wpa2ent_simpletest.py index 553590e..d7c2daa 100644 --- a/examples/esp32spi_wpa2ent_simpletest.py +++ b/examples/esp32spi_wpa2ent_simpletest.py @@ -21,8 +21,12 @@ # support Python 3.x and thus, CircuitPython def version_compare(version1, version2): def normalize(v): - return [int(x) for x in re.sub(r'(\.0+)*$','', v).split(".")] - return (normalize(version1) > normalize(version2)) - (normalize(version1) < normalize(version2)) + return [int(x) for x in re.sub(r"(\.0+)*$", "", v).split(".")] + + return (normalize(version1) > normalize(version2)) - ( + normalize(version1) < normalize(version2) + ) + print("ESP32 SPI WPA2 Enterprise test") @@ -48,30 +52,31 @@ def normalize(v): # Get the ESP32 fw version number, remove trailing byte off the returned bytearray # and then convert it to a string for prettier printing and later comparison -firmware_version = ''.join([chr(b) for b in esp.firmware_version[:-1]]) +firmware_version = "".join([chr(b) for b in esp.firmware_version[:-1]]) print("Firmware vers.", firmware_version) print("MAC addr:", [hex(i) for i in esp.MAC_address]) # WPA2 Enterprise support was added in fw ver 1.3.0. Check that the ESP32 # is running at least that version, otherwise, bail out -assert version_compare(firmware_version, "1.3.0") >= 0, ( - "Incorrect ESP32 firmware version; >= 1.3.0 required.") +assert ( + version_compare(firmware_version, "1.3.0") >= 0 +), "Incorrect ESP32 firmware version; >= 1.3.0 required." # Set up the SSID you would like to connect to # Note that we need to call wifi_set_network prior # to calling wifi_set_enable. -esp.wifi_set_network(b'YOUR_SSID_HERE') +esp.wifi_set_network(b"YOUR_SSID_HERE") # If your WPA2 Enterprise network requires an anonymous # identity to be set, you may set that here -esp.wifi_set_entidentity(b'') +esp.wifi_set_entidentity(b"") # Set the WPA2 Enterprise username you'd like to use -esp.wifi_set_entusername(b'MY_USERNAME') +esp.wifi_set_entusername(b"MY_USERNAME") # Set the WPA2 Enterprise password you'd like to use -esp.wifi_set_entpassword(b'MY_PASSWORD') +esp.wifi_set_entpassword(b"MY_PASSWORD") # Once the network settings have been configured, # we need to enable WPA2 Enterprise mode on the ESP32 @@ -80,13 +85,15 @@ def normalize(v): # Wait for the network to come up print("Connecting to AP...") while not esp.is_connected: - print(".", end = "") + print(".", end="") time.sleep(2) print("") -print("Connected to", str(esp.ssid, 'utf-8'), "\tRSSI:", esp.rssi) +print("Connected to", str(esp.ssid, "utf-8"), "\tRSSI:", esp.rssi) print("My IP address is", esp.pretty_ip(esp.ip_address)) -print("IP lookup adafruit.com: %s" % esp.pretty_ip(esp.get_host_by_name("adafruit.com"))) +print( + "IP lookup adafruit.com: %s" % esp.pretty_ip(esp.get_host_by_name("adafruit.com")) +) print("Ping google.com: %d ms" % esp.ping("google.com")) print("Done!") diff --git a/examples/server/esp32spi_wsgiserver.py b/examples/server/esp32spi_wsgiserver.py index ec45573..1f124cd 100755 --- a/examples/server/esp32spi_wsgiserver.py +++ b/examples/server/esp32spi_wsgiserver.py @@ -37,13 +37,17 @@ # esp32_reset = DigitalInOut(board.D5) spi = busio.SPI(board.SCK, board.MOSI, board.MISO) -esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset) # pylint: disable=line-too-long +esp = adafruit_esp32spi.ESP_SPIcontrol( + spi, esp32_cs, esp32_ready, esp32_reset +) # pylint: disable=line-too-long print("MAC addr:", [hex(i) for i in esp.MAC_address]) print("MAC addr actual:", [hex(i) for i in esp.MAC_address_actual]) # Use below for Most Boards -status_light = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2) # Uncomment for Most Boards +status_light = neopixel.NeoPixel( + board.NEOPIXEL, 1, brightness=0.2 +) # Uncomment for Most Boards # Uncomment below for ItsyBitsy M4 # import adafruit_dotstar as dotstar # status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=1) @@ -62,6 +66,7 @@ # wifi = wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light) # wifi.create_ap() + class SimpleWSGIApplication: """ An example of a simple WSGI Application that supports @@ -69,7 +74,7 @@ class SimpleWSGIApplication: """ INDEX = "/index.html" - CHUNK_SIZE = 8912 # max number of bytes to read at once when reading files + CHUNK_SIZE = 8912 # max number of bytes to read at once when reading files def __init__(self, static_dir=None, debug=False): self._debug = debug @@ -94,15 +99,21 @@ def __call__(self, environ, start_response): headers = [] resp_data = [] - key = self._get_listener_key(environ["REQUEST_METHOD"].lower(), environ["PATH_INFO"]) + key = self._get_listener_key( + environ["REQUEST_METHOD"].lower(), environ["PATH_INFO"] + ) if key in self._listeners: status, headers, resp_data = self._listeners[key](environ) if environ["REQUEST_METHOD"].lower() == "get" and self._static: path = environ["PATH_INFO"] if path in self._static_files: - status, headers, resp_data = self.serve_file(path, directory=self._static) + status, headers, resp_data = self.serve_file( + path, directory=self._static + ) elif path == "/" and self.INDEX in self._static_files: - status, headers, resp_data = self.serve_file(self.INDEX, directory=self._static) + status, headers, resp_data = self.serve_file( + self.INDEX, directory=self._static + ) self._start_response(status, headers) return resp_data @@ -128,8 +139,9 @@ def serve_file(self, file_path, directory=None): headers = [("Content-Type", self._get_content_type(file_path))] full_path = file_path if not directory else directory + file_path + def resp_iter(): - with open(full_path, 'rb') as file: + with open(full_path, "rb") as file: while True: chunk = file.read(self.CHUNK_SIZE) if chunk: @@ -139,16 +151,16 @@ def resp_iter(): return (status, headers, resp_iter()) - def _log_environ(self, environ): # pylint: disable=no-self-use + def _log_environ(self, environ): # pylint: disable=no-self-use print("environ map:") for name, value in environ.items(): print(name, value) - def _get_listener_key(self, method, path): # pylint: disable=no-self-use + def _get_listener_key(self, method, path): # pylint: disable=no-self-use return "{0}|{1}".format(method.lower(), path) - def _get_content_type(self, file): # pylint: disable=no-self-use - ext = file.split('.')[-1] + def _get_content_type(self, file): # pylint: disable=no-self-use + ext = file.split(".")[-1] if ext in ("html", "htm"): return "text/html" if ext == "js": @@ -161,24 +173,28 @@ def _get_content_type(self, file): # pylint: disable=no-self-use return "image/png" return "text/plain" + # Our HTTP Request handlers -def led_on(environ): # pylint: disable=unused-argument +def led_on(environ): # pylint: disable=unused-argument print("led on!") status_light.fill((0, 0, 100)) return web_app.serve_file("static/index.html") -def led_off(environ): # pylint: disable=unused-argument + +def led_off(environ): # pylint: disable=unused-argument print("led off!") status_light.fill(0) return web_app.serve_file("static/index.html") -def led_color(environ): # pylint: disable=unused-argument + +def led_color(environ): # pylint: disable=unused-argument json = json_module.loads(environ["wsgi.input"].getvalue()) print(json) rgb_tuple = (json.get("r"), json.get("g"), json.get("b")) status_light.fill(rgb_tuple) return ("200 OK", [], []) + # Here we create our application, setting the static directory location # and registering the above request_handlers for specific HTTP requests # we want to listen and respond to. @@ -186,13 +202,21 @@ def led_color(environ): # pylint: disable=unused-argument try: static_files = os.listdir(static) if "index.html" not in static_files: - raise RuntimeError(""" + raise RuntimeError( + """ This example depends on an index.html, but it isn't present. - Please add it to the {0} directory""".format(static)) + Please add it to the {0} directory""".format( + static + ) + ) except (OSError) as e: - raise RuntimeError(""" + raise RuntimeError( + """ This example depends on a static asset directory. - Please create one named {0} in the root of the device filesystem.""".format(static)) + Please create one named {0} in the root of the device filesystem.""".format( + static + ) + ) web_app = SimpleWSGIApplication(static_dir=static) web_app.on("GET", "/led_on", led_on) @@ -216,4 +240,3 @@ def led_color(environ): # pylint: disable=unused-argument print("Failed to update server, restarting ESP32\n", e) wifi.reset() continue - \ No newline at end of file diff --git a/setup.py b/setup.py index 4800b8b..7df789f 100644 --- a/setup.py +++ b/setup.py @@ -6,6 +6,7 @@ """ from setuptools import setup, find_packages + # To use a consistent encoding from codecs import open from os import path @@ -13,52 +14,40 @@ here = path.abspath(path.dirname(__file__)) # Get the long description from the README file -with open(path.join(here, 'README.rst'), encoding='utf-8') as f: +with open(path.join(here, "README.rst"), encoding="utf-8") as f: long_description = f.read() setup( - name='adafruit-circuitpython-esp32spi', - + name="adafruit-circuitpython-esp32spi", use_scm_version=True, - setup_requires=['setuptools_scm'], - - description='CircuitPython driver library for using ESP32 as WiFi co-processor using SPI', + setup_requires=["setuptools_scm"], + description="CircuitPython driver library for using ESP32 as WiFi co-processor using SPI", long_description=long_description, - long_description_content_type='text/x-rst', - + long_description_content_type="text/x-rst", # The project's main homepage. - url='https://github.com/adafruit/Adafruit_CircuitPython_ESP32SPI', - + url="https://github.com/adafruit/Adafruit_CircuitPython_ESP32SPI", # Author details - author='Adafruit Industries', - author_email='circuitpython@adafruit.com', - - install_requires=[ - 'Adafruit-Blinka', - 'adafruit-circuitpython-busdevice' - ], - + author="Adafruit Industries", + author_email="circuitpython@adafruit.com", + install_requires=["Adafruit-Blinka", "adafruit-circuitpython-busdevice"], # Choose your license - license='MIT', - + license="MIT", # See https://pypi.python.org/pypi?%3Aaction=list_classifiers classifiers=[ - 'Development Status :: 3 - Alpha', - 'Intended Audience :: Developers', - 'Topic :: Software Development :: Libraries', - 'Topic :: System :: Hardware', - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.4', - 'Programming Language :: Python :: 3.5', + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Topic :: Software Development :: Libraries", + "Topic :: System :: Hardware", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.4", + "Programming Language :: Python :: 3.5", ], - # What does your project relate to? - keywords='adafruit blinka circuitpython micropython esp32spi wifi esp32', - + keywords="adafruit blinka circuitpython micropython esp32spi wifi esp32", # You can just specify the packages manually here if your project is # simple. Or you can use find_packages(). # TODO: IF LIBRARY FILES ARE A PACKAGE FOLDER, # CHANGE `py_modules=['...']` TO `packages=['...']` - packages=['adafruit_esp32spi'], + packages=["adafruit_esp32spi"], )