From b544f2ff5d4665e436623c6c2f9dcffdbf871a1a Mon Sep 17 00:00:00 2001 From: Dan Halbert Date: Wed, 7 Aug 2024 19:50:28 -0400 Subject: [PATCH 1/6] changes from MicroPython v1.22-release; CIRCUITPY-CHANGES annotations --- asyncio/__init__.py | 10 ++++-- asyncio/core.py | 67 +++++++++++++++++++++++++++++----- asyncio/event.py | 41 +++++++++++++-------- asyncio/funcs.py | 66 ++++++++++++++++++++++------------ asyncio/lock.py | 12 ++++++- asyncio/manifest.py | 24 ------------- asyncio/stream.py | 85 ++++++++++++++++++++++++++++++++++++-------- asyncio/task.py | 14 ++++---- asyncio/traceback.py | 9 ++--- 9 files changed, 228 insertions(+), 100 deletions(-) delete mode 100644 asyncio/manifest.py diff --git a/asyncio/__init__.py b/asyncio/__init__.py index ce8837d..9b07a5b 100644 --- a/asyncio/__init__.py +++ b/asyncio/__init__.py @@ -1,10 +1,12 @@ -# SPDX-FileCopyrightText: 2019 Damien P. George +# CIRCUITPY-CHANGE: SPDX +# SPDX-FileCopyrightText: 2019-2020 Damien P. George # # SPDX-License-Identifier: MIT -# -# MicroPython uasyncio module + +# MicroPython asyncio module # MIT license; Copyright (c) 2019 Damien P. George # +# CIRCUITPY-CHANGE # This code comes from MicroPython, and has not been run through black or pylint there. # Altering these files significantly would make merging difficult, so we will not use # pylint or black. @@ -13,6 +15,7 @@ from .core import * +# CIRCUITPY-CHANGE: use CircuitPython version __version__ = "0.0.0+auto.0" __repo__ = "https://github.com/Adafruit/Adafruit_CircuitPython_asyncio.git" @@ -29,6 +32,7 @@ "StreamWriter": "stream", } + # Lazy loader, effectively does: # global attr # from .mod import attr diff --git a/asyncio/core.py b/asyncio/core.py index 26a97ba..eaeef81 100644 --- a/asyncio/core.py +++ b/asyncio/core.py @@ -1,23 +1,23 @@ -# SPDX-FileCopyrightText: 2019 Damien P. George +# CIRCUITPY-CHANGE: SPDX +# SPDX-FileCopyrightText: 2019-2020 Damien P. George # # SPDX-License-Identifier: MIT -# -# MicroPython uasyncio module + +# MicroPython asyncio module # MIT license; Copyright (c) 2019 Damien P. George # +# # CIRCUITPY-CHANGE: use CircuitPython version # This code comes from MicroPython, and has not been run through black or pylint there. # Altering these files significantly would make merging difficult, so we will not use # pylint or black. # pylint: skip-file # fmt: off -""" -Core -==== -""" +# CIRCUITPY-CHANGE: use our ticks library from adafruit_ticks import ticks_ms as ticks, ticks_diff, ticks_add import sys, select +# CIRCUITPY-CHANGE: CircuitPython traceback support try: from traceback import print_exception except: @@ -26,6 +26,7 @@ # Import TaskQueue and Task, preferring built-in C code over Python code try: from _asyncio import TaskQueue, Task +# CIRCUITPY-CHANGE: more specific error checking except ImportError: from .task import TaskQueue, Task @@ -33,6 +34,7 @@ # Exceptions +# CIRCUITPY-CHANGE # Depending on the release of CircuitPython these errors may or may not # exist in the C implementation of `_asyncio`. However, when they # do exist, they must be preferred over the Python code. @@ -50,6 +52,7 @@ class InvalidStateError(Exception): class TimeoutError(Exception): + # CIRCUITPY-CHANGE: docstring """Raised when waiting for a task longer than the specified timeout.""" pass @@ -62,6 +65,7 @@ class TimeoutError(Exception): ################################################################################ # Sleep functions + # "Yield" once, then raise StopIteration class SingletonGenerator: def __init__(self): @@ -71,11 +75,13 @@ def __init__(self): def __iter__(self): return self + # CIRCUITPY-CHANGE: provide await def __await__(self): return self def __next__(self): if self.state is not None: + # CIRCUITPY-CHANGE: when 8.x support is discontinued, change to .push() _task_queue.push_sorted(cur_task, self.state) self.state = None return None @@ -87,11 +93,13 @@ def __next__(self): # Pause task execution for the given time (integer in milliseconds, uPy extension) # Use a SingletonGenerator to do it without allocating on the heap def sleep_ms(t, sgen=SingletonGenerator()): + # CIRCUITPY-CHANGE: doc """Sleep for *t* milliseconds. This is a coroutine, and a MicroPython extension. """ + # CIRCUITPY-CHANGE: add debugging hint assert sgen.state is None, "Check for a missing `await` in your code" sgen.state = ticks_add(ticks(), max(0, t)) return sgen @@ -99,6 +107,7 @@ def sleep_ms(t, sgen=SingletonGenerator()): # Pause task execution for the given time (in seconds) def sleep(t): + # CIRCUITPY-CHANGE: doc """Sleep for *t* seconds This is a coroutine. @@ -107,6 +116,7 @@ def sleep(t): return sleep_ms(int(t * 1000)) +# CIRCUITPY-CHANGE: see https://github.com/adafruit/Adafruit_CircuitPython_asyncio/pull/30 ################################################################################ # "Never schedule" object" # Don't re-schedule the object that awaits _never(). @@ -166,12 +176,16 @@ def _dequeue(self, s): del self.map[id(s)] self.poller.unregister(s) + # CIRCUITPY-CHANGE: async async def queue_read(self, s): self._enqueue(s, 0) + # CIRCUITPY-CHANGE: do not reschedule await _never() + # CIRCUITPY-CHANGE: async async def queue_write(self, s): self._enqueue(s, 1) + # CIRCUITPY-CHANGE: do not reschedule await _never() def remove(self, task): @@ -193,10 +207,12 @@ def wait_io_event(self, dt): # print('poll', s, sm, ev) if ev & ~select.POLLOUT and sm[0] is not None: # POLLIN or error + # CIRCUITPY-CHANGE: when 8.x support is discontinued, change to .push() _task_queue.push_head(sm[0]) sm[0] = None if ev & ~select.POLLIN and sm[1] is not None: # POLLOUT or error + # CIRCUITPY-CHANGE: when 8.x support is discontinued, change to .push() _task_queue.push_head(sm[1]) sm[1] = None if sm[0] is None and sm[1] is None: @@ -210,6 +226,7 @@ def wait_io_event(self, dt): ################################################################################ # Main run loop + # Ensure the awaitable is a task def _promote_to_task(aw): return aw if isinstance(aw, Task) else create_task(aw) @@ -217,6 +234,7 @@ def _promote_to_task(aw): # Create and schedule a new task from a coroutine def create_task(coro): + # CIRCUITPY-CHANGE: doc """Create a new task from the given coroutine and schedule it to run. Returns the corresponding `Task` object. @@ -225,12 +243,14 @@ def create_task(coro): if not hasattr(coro, "send"): raise TypeError("coroutine expected") t = Task(coro, globals()) + # CIRCUITPY-CHANGE: when 8.x support is discontinued, change to .push() _task_queue.push_head(t) return t # Keep scheduling tasks until there are none left to schedule def run_until_complete(main_task=None): + # CIRCUITPY-CHANGE: doc """Run the given *main_task* until it completes.""" global cur_task @@ -247,11 +267,13 @@ def run_until_complete(main_task=None): dt = max(0, ticks_diff(t.ph_key, ticks())) elif not _io_queue.map: # No tasks can be woken so finished running + cur_task = None return # print('(poll {})'.format(dt), len(_io_queue.map)) _io_queue.wait_io_event(dt) # Get next task to run and continue it + # CIRCUITPY-CHANGE: when 8.x support is discontinued, change to .pop() t = _task_queue.pop_head() cur_task = t try: @@ -271,6 +293,7 @@ def run_until_complete(main_task=None): assert t.data is None # This task is done, check if it's the main task and then loop should stop if t is main_task: + cur_task = None if isinstance(er, StopIteration): return er.value raise er @@ -288,6 +311,7 @@ def run_until_complete(main_task=None): else: # Schedule any other tasks waiting on the completion of this task. while t.state.peek(): + # CIRCUITPY-CHANGE: when 8.x support is discontinued, change to .push() and .pop() _task_queue.push_head(t.state.pop_head()) waiting = True # "False" indicates that the task is complete and has been await'ed on. @@ -296,12 +320,18 @@ def run_until_complete(main_task=None): # An exception ended this detached task, so queue it for later # execution to handle the uncaught exception if no other task retrieves # the exception in the meantime (this is handled by Task.throw). + # CIRCUITPY-CHANGE: when 8.x support is discontinued, change to .push() _task_queue.push_head(t) # Save return value of coro to pass up to caller. t.data = er elif t.state is None: # Task is already finished and nothing await'ed on the task, # so call the exception handler. + + # Save exception raised by the coro for later use. + t.data = exc + + # Create exception context and call the exception handler. _exc_context["exception"] = exc _exc_context["future"] = t Loop.call_exception_handler(_exc_context) @@ -309,6 +339,7 @@ def run_until_complete(main_task=None): # Create a new task from a coroutine and run it until it finishes def run(coro): + # CIRCUITPY-CHANGE: doc """Create a new task from the given coroutine and run it until it completes. Returns the value returned by *coro*. @@ -325,20 +356,24 @@ async def _stopper(): pass +cur_task = None _stop_task = None class Loop: + # CIRCUITPY-CHANGE: doc """Class representing the event loop""" _exc_handler = None def create_task(coro): + # CIRCUITPY-CHANGE: doc """Create a task from the given *coro* and return the new `Task` object.""" return create_task(coro) def run_forever(): + # CIRCUITPY-CHANGE: doc """Run the event loop until `Loop.stop()` is called.""" global _stop_task @@ -347,6 +382,7 @@ def run_forever(): # TODO should keep running until .stop() is called, even if there're no tasks left def run_until_complete(aw): + # CIRCUITPY-CHANGE: doc """Run the given *awaitable* until it completes. If *awaitable* is not a task then it will be promoted to one. """ @@ -354,20 +390,24 @@ def run_until_complete(aw): return run_until_complete(_promote_to_task(aw)) def stop(): + # CIRCUITPY-CHANGE: doc """Stop the event loop""" global _stop_task if _stop_task is not None: + # CIRCUITPY-CHANGE: when 8.x support is discontinued, change to .push() _task_queue.push_head(_stop_task) # If stop() is called again, do nothing _stop_task = None def close(): + # CIRCUITPY-CHANGE: doc """Close the event loop.""" pass def set_exception_handler(handler): + # CIRCUITPY-CHANGE: doc """Set the exception handler to call when a Task raises an exception that is not caught. The *handler* should accept two arguments: ``(loop, context)`` """ @@ -375,6 +415,7 @@ def set_exception_handler(handler): Loop._exc_handler = handler def get_exception_handler(): + # CIRCUITPY-CHANGE: doc """Get the current exception handler. Returns the handler, or ``None`` if no custom handler is set. """ @@ -382,12 +423,15 @@ def get_exception_handler(): return Loop._exc_handler def default_exception_handler(loop, context): + # CIRCUITPY-CHANGE: doc """The default exception handler that is called.""" + # CIRCUITPY_CHANGE: use CircuitPython traceback printing exc = context["exception"] print_exception(None, exc, exc.__traceback__) def call_exception_handler(context): + # CIRCUITPY-CHANGE: doc """Call the current exception handler. The argument *context* is passed through and is a dictionary containing keys: ``'message'``, ``'exception'``, ``'future'`` @@ -397,29 +441,36 @@ def call_exception_handler(context): # The runq_len and waitq_len arguments are for legacy uasyncio compatibility def get_event_loop(runq_len=0, waitq_len=0): - """Return the event loop used to schedule and run tasks. See `Loop`.""" + # CIRCUITPY-CHANGE: doc + """Return the event loop used to schedule and run tasks. See `Loop`. Deprecated and will be removed later.""" return Loop def current_task(): + # CIRCUITPY-CHANGE: doc """Return the `Task` object associated with the currently running task.""" + if cur_task is None: + raise RuntimeError("no running event loop") return cur_task def new_event_loop(): + # CIRCUITPY-CHANGE: doc """Reset the event loop and return it. **NOTE**: Since MicroPython only has a single event loop, this function just resets the loop's state, it does not create a new one """ + # CIRCUITPY-CHANGE: add _exc_context, cur_task global _task_queue, _io_queue, _exc_context, cur_task # TaskQueue of Task instances _task_queue = TaskQueue() # Task queue and poller for stream IO _io_queue = IOQueue() + # CIRCUITPY-CHANGE: exception info cur_task = None _exc_context['exception'] = None _exc_context['future'] = None diff --git a/asyncio/event.py b/asyncio/event.py index a402d26..362d2e1 100644 --- a/asyncio/event.py +++ b/asyncio/event.py @@ -1,24 +1,24 @@ +# CIRCUITPY-CHANGE: SPDX # SPDX-FileCopyrightText: 2019-2020 Damien P. George # # SPDX-License-Identifier: MIT -# -# MicroPython uasyncio module + +# MicroPython asyncio module # MIT license; Copyright (c) 2019-2020 Damien P. George # +# CIRCUITPY-CHANGE # This code comes from MicroPython, and has not been run through black or pylint there. # Altering these files significantly would make merging difficult, so we will not use # pylint or black. # pylint: skip-file # fmt: off -""" -Events -====== -""" from . import core + # Event class for primitive events that can be waited on, set, and cleared class Event: + # CIRCUITPY-CHANGE: doc """Create a new event which can be used to synchronize tasks. Events start in the cleared state. """ @@ -28,11 +28,13 @@ def __init__(self): self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event def is_set(self): + # CIRCUITPY-CHANGE: doc """Returns ``True`` if the event is set, ``False`` otherwise.""" return self.state def set(self): + # CIRCUITPY-CHANGE: doc """Set the event. Any tasks waiting on the event will be scheduled to run. """ @@ -40,15 +42,19 @@ def set(self): # Note: This must not be called from anything except the thread running # the asyncio loop (i.e. neither hard or soft IRQ, or a different thread). while self.waiting.peek(): + # CIRCUITPY-CHANGE: when 8.x support is discontinued, change to .push() and .pop() core._task_queue.push_head(self.waiting.pop_head()) self.state = True def clear(self): + # CIRCUITPY-CHANGE: doc """Clear the event.""" self.state = False + # CIRCUITPY-CHANGE: async async def wait(self): + # CIRCUITPY-CHANGE: doc """Wait for the event to be set. If the event is already set then it returns immediately. @@ -57,9 +63,11 @@ async def wait(self): if not self.state: # Event not set, put the calling task on the event's waiting queue + # CIRCUITPY-CHANGE: when 8.x support is discontinued, change to .push() self.waiting.push_head(core.cur_task) # Set calling task's data to the event's queue so it can be removed if needed core.cur_task.data = self.waiting + # CIRCUITPY-CHANGE: use await; never reschedule await core._never() return True @@ -67,26 +75,29 @@ async def wait(self): # MicroPython-extension: This can be set from outside the asyncio event loop, # such as other threads, IRQs or scheduler context. Implementation is a stream # that asyncio will poll until a flag is set. -# Note: Unlike Event, this is self-clearing. +# Note: Unlike Event, this is self-clearing after a wait(). try: - import uio + import io - class ThreadSafeFlag(uio.IOBase): + class ThreadSafeFlag(io.IOBase): def __init__(self): - self._flag = 0 + self.state = 0 def ioctl(self, req, flags): if req == 3: # MP_STREAM_POLL - return self._flag * flags - return None + return self.state * flags + return -1 # Other requests are unsupported def set(self): - self._flag = 1 + self.state = 1 + + def clear(self): + self.state = 0 async def wait(self): - if not self._flag: + if not self.state: yield core._io_queue.queue_read(self) - self._flag = 0 + self.state = 0 except ImportError: pass diff --git a/asyncio/funcs.py b/asyncio/funcs.py index b1bb24a..48d302d 100644 --- a/asyncio/funcs.py +++ b/asyncio/funcs.py @@ -1,20 +1,17 @@ +# CIRCUITPY-CHANGE: SPDX # SPDX-FileCopyrightText: 2019-2020 Damien P. George # # SPDX-License-Identifier: MIT -# -# MicroPython uasyncio module + +# MicroPython asyncio module # MIT license; Copyright (c) 2019-2022 Damien P. George # +# CIRCUITPY-CHANGE # This code comes from MicroPython, and has not been run through black or pylint there. # Altering these files significantly would make merging difficult, so we will not use # pylint or black. # pylint: skip-file # fmt: off -""" -Functions -========= -""" - from . import core @@ -36,6 +33,7 @@ async def _run(waiter, aw): waiter.data = core.CancelledError(status, result) async def wait_for(aw, timeout, sleep=core.sleep): + # CIRCUITPY-CHANGE: doc """Wait for the *aw* awaitable to complete, but cancel if it takes longer than *timeout* seconds. If *aw* is not a task then a task will be created from it. @@ -59,6 +57,7 @@ async def wait_for(aw, timeout, sleep=core.sleep): # Wait for the timeout to elapse. await sleep(timeout) except core.CancelledError as er: + # CIRCUITPY-CHANGE: more general fetching of exception arg status = er.args[0] if er.args else None if status is None: # This wait_for was cancelled externally, so cancel aw and re-raise. @@ -78,6 +77,7 @@ async def wait_for(aw, timeout, sleep=core.sleep): def wait_for_ms(aw, timeout): + # CIRCUITPY-CHANGE: doc """Similar to `wait_for` but *timeout* is an integer in milliseconds. This is a coroutine, and a MicroPython extension. @@ -92,12 +92,15 @@ def remove(t): pass +# CIRCUITPY-CHANGE: async async def gather(*aws, return_exceptions=False): + # CIRCUITPY-CHANGE: doc """Run all *aws* awaitables concurrently. Any *aws* that are not tasks are promoted to tasks. Returns a list of return values of all *aws* """ + # CIRCUITPY-CHANGE: no awaitables, so nothing to gather if not aws: return [] @@ -119,28 +122,42 @@ def done(t, er): # Still some sub-tasks running. return # Gather waiting is done, schedule the main gather task. + # CIRCUITPY-CHANGE: when 8.x support is discontinued, change to .push() core._task_queue.push_head(gather_task) + # Prepare the sub-tasks for the gather. + # The `state` variable counts the number of tasks to wait for, and can be negative + # if the gather should not run at all (because a task already had an exception). ts = [core._promote_to_task(aw) for aw in aws] + state = 0 for i in range(len(ts)): - if ts[i].state is not True: - # Task is not running, gather not currently supported for this case. + if ts[i].state is True: + # Task is running, register the callback to call when the task is done. + ts[i].state = done + state += 1 + elif not ts[i].state: + # Task finished already. + if not isinstance(ts[i].data, StopIteration): + # Task finished by raising an exception. + if not return_exceptions: + # Do not run this gather at all. + state = -len(ts) + else: + # Task being waited on, gather not currently supported for this case. raise RuntimeError("can't gather") - # Register the callback to call when the task is done. - ts[i].state = done # Set the state for execution of the gather. gather_task = core.cur_task - state = len(ts) cancel_all = False - # Wait for the a sub-task to need attention. - gather_task.data = _Remove - try: - await core._never() - except core.CancelledError as er: - cancel_all = True - state = er + # Wait for a sub-task to need attention (if there are any to wait for). + if state > 0: + gather_task.data = _Remove + try: + yield + except core.CancelledError as er: + cancel_all = True + state = er # Clean up tasks. for i in range(len(ts)): @@ -153,12 +170,17 @@ def done(t, er): # Sub-task ran to completion, get its return value. ts[i] = ts[i].data.value else: - # Sub-task had an exception with return_exceptions==True, so get its exception. - ts[i] = ts[i].data + # Sub-task had an exception. + if return_exceptions: + # Get the sub-task exception to return in the list of return values. + ts[i] = ts[i].data + elif isinstance(state, int): + # Raise the sub-task exception, if there is not already an exception to raise. + state = ts[i].data # Either this gather was cancelled, or one of the sub-tasks raised an exception with # return_exceptions==False, so reraise the exception here. - if state is not 0: + if state: raise state # Return the list of return values of each sub-task. diff --git a/asyncio/lock.py b/asyncio/lock.py index 71c972f..6dc8bbc 100644 --- a/asyncio/lock.py +++ b/asyncio/lock.py @@ -1,10 +1,12 @@ +# CIRCUITPY-CHANGE: SPDX # SPDX-FileCopyrightText: 2019-2020 Damien P. George # # SPDX-License-Identifier: MIT # # MicroPython uasyncio module # MIT license; Copyright (c) 2019-2020 Damien P. George -# + +# CICUITPY-CHANGE # This code comes from MicroPython, and has not been run through black or pylint there. # Altering these files significantly would make merging difficult, so we will not use # pylint or black. @@ -19,6 +21,7 @@ # Lock class for primitive mutex capability class Lock: + # CIRCUITPY-CHANGE: doc """Create a new lock which can be used to coordinate tasks. Locks start in the unlocked state. @@ -36,11 +39,13 @@ def __init__(self): self.waiting = core.TaskQueue() def locked(self): + # CIRCUITPY-CHANGE: doc """Returns ``True`` if the lock is locked, otherwise ``False``.""" return self.state == 1 def release(self): + # CIRCUITPY-CHANGE: doc """Release the lock. If any tasks are waiting on the lock then the next one in the queue is scheduled to run and the lock remains locked. Otherwise, no tasks are waiting and the lock becomes unlocked. @@ -50,13 +55,16 @@ def release(self): raise RuntimeError("Lock not acquired") if self.waiting.peek(): # Task(s) waiting on lock, schedule next Task + # CIRCUITPY-CHANGE: when 8.x support is discontinued, change to .pop() and .push() self.state = self.waiting.pop_head() core._task_queue.push_head(self.state) else: # No Task waiting so unlock self.state = 0 + # CIRCUITPY-CHANGE: async, since we don't use yield async def acquire(self): + # CIRCUITPY-CHANGE: doc """Wait for the lock to be in the unlocked state and then lock it in an atomic way. Only one task can acquire the lock at any one time. @@ -65,10 +73,12 @@ async def acquire(self): if self.state != 0: # Lock unavailable, put the calling Task on the waiting queue + # CIRCUITPY-CHANGE: when 8.x support is discontinued, change to .push() self.waiting.push_head(core.cur_task) # Set calling task's data to the lock's queue so it can be removed if needed core.cur_task.data = self.waiting try: + # CIRCUITPY-CHANGE await without rescheduling await core._never() except core.CancelledError as er: if self.state == core.cur_task: diff --git a/asyncio/manifest.py b/asyncio/manifest.py deleted file mode 100644 index 24082ff..0000000 --- a/asyncio/manifest.py +++ /dev/null @@ -1,24 +0,0 @@ -# SPDX-FileCopyrightText: 2019 Damien P. George -# -# SPDX-License-Identifier: MIT -# -# -# This code comes from MicroPython, and has not been run through black or pylint there. -# Altering these files significantly would make merging difficult, so we will not use -# pylint or black. -# pylint: skip-file -# fmt: off - -# This list of frozen files doesn't include task.py because that's provided by the C module. -freeze( - "..", - ( - "uasyncio/__init__.py", - "uasyncio/core.py", - "uasyncio/event.py", - "uasyncio/funcs.py", - "uasyncio/lock.py", - "uasyncio/stream.py", - ), - opt=3, -) diff --git a/asyncio/stream.py b/asyncio/stream.py index 50cc8de..24523b6 100644 --- a/asyncio/stream.py +++ b/asyncio/stream.py @@ -1,3 +1,4 @@ +# CIRCUITPY-CHANGE: SPDX # SPDX-FileCopyrightText: 2019-2020 Damien P. George # # SPDX-License-Identifier: MIT @@ -5,20 +6,18 @@ # MicroPython uasyncio module # MIT license; Copyright (c) 2019-2020 Damien P. George # +# CIRCUITPY-CHANGE # This code comes from MicroPython, and has not been run through black or pylint there. # Altering these files significantly would make merging difficult, so we will not use # pylint or black. # pylint: skip-file # fmt: off -""" -Streams -======= -""" from . import core class Stream: + #CIRCUITPY-CHANGE: doc """This represents a TCP stream connection. To minimise code this class implements both a reader and a writer, and both ``StreamReader`` and ``StreamWriter`` alias to this class. @@ -30,6 +29,7 @@ def __init__(self, s, e={}): self.out_buf = b"" def get_extra_info(self, v): + #CIRCUITPY-CHANGE: doc """Get extra information about the stream, given by *v*. The valid values for *v* are: ``peername``. """ @@ -39,7 +39,9 @@ def get_extra_info(self, v): def close(self): pass + # CIRCUITPY-CHANGE: async async def wait_closed(self): + # CIRCUITPY-CHANGE: doc """Wait for the stream to close. This is a coroutine. @@ -48,7 +50,9 @@ async def wait_closed(self): # TODO yield? self.s.close() + # CIRCUITPY-CHANGE: async async def read(self, n): + # CIRCUITPY-CHANGE: doc """Read up to *n* bytes and return them. This is a coroutine. @@ -57,6 +61,7 @@ async def read(self, n): await core._io_queue.queue_read(self.s) return self.s.read(n) + # CIRCUITPY-CHANGE: async async def readinto(self, buf): """Read up to n bytes into *buf* with n being equal to the length of *buf* @@ -65,10 +70,13 @@ async def readinto(self, buf): This is a coroutine, and a MicroPython extension. """ + # CIRCUITPY-CHANGE: await, not yield await core._io_queue.queue_read(self.s) return self.s.readinto(buf) + # CIRCUITPY-CHANGE: async async def readexactly(self, n): + # CIRCUITPY-CHANGE: doc """Read exactly *n* bytes and return them as a bytes object. Raises an ``EOFError`` exception if the stream ends before reading @@ -79,6 +87,7 @@ async def readexactly(self, n): r = b"" while n: + # CIRCUITPY-CHANGE: await, not yield await core._io_queue.queue_read(self.s) r2 = self.s.read(n) if r2 is not None: @@ -88,7 +97,9 @@ async def readexactly(self, n): n -= len(r2) return r + # CIRCUITPY-CHANGE: async async def readline(self): + # CIRCUITPY-CHANGE: doc """Read a line and return it. This is a coroutine. @@ -96,6 +107,7 @@ async def readline(self): l = b"" while True: + # CIRCUITPY-CHANGE: await, not yield await core._io_queue.queue_read(self.s) l2 = self.s.readline() # may do multiple reads but won't block l += l2 @@ -103,6 +115,7 @@ async def readline(self): return l def write(self, buf): + # CIRCUITPY-CHANGE: doc """Accumulated *buf* to the output buffer. The data is only flushed when `Stream.drain` is called. It is recommended to call `Stream.drain` immediately after calling this function. @@ -114,10 +127,11 @@ def write(self, buf): return if ret is not None: buf = buf[ret:] - self.out_buf += buf + # CIRCUITPY-CHANGE: async async def drain(self): + # CIRCUITPY-CHANGE: doc """Drain (write) all buffered output data out to the stream. This is a coroutine. @@ -126,6 +140,7 @@ async def drain(self): mv = memoryview(self.out_buf) off = 0 while off < len(mv): + # CIRCUITPY-CHANGE: await, not yield await core._io_queue.queue_write(self.s) ret = self.s.write(mv[off:]) if ret is not None: @@ -139,7 +154,9 @@ async def drain(self): # Create a TCP stream connection to a remote host -async def open_connection(host, port): +# CIRCUITPY-CHANGE: async +async def open_connection(host, port, ssl=None, server_hostname=None): + # CIRCUITPY-CHANGE: doc """Open a TCP connection to the given *host* and *port*. The *host* address will be resolved using `socket.getaddrinfo`, which is currently a blocking call. @@ -150,23 +167,34 @@ async def open_connection(host, port): """ from uerrno import EINPROGRESS - import usocket as socket + import socket ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking! s = socket.socket(ai[0], ai[1], ai[2]) s.setblocking(False) - ss = Stream(s) try: s.connect(ai[-1]) except OSError as er: if er.errno != EINPROGRESS: raise er - await core._io_queue.queue_write(s) + # wrap with SSL, if requested + if ssl: + if ssl is True: + import ssl as _ssl + + ssl = _ssl.SSLContext(_ssl.PROTOCOL_TLS_CLIENT) + if not server_hostname: + server_hostname = host + s = ssl.wrap_socket(s, server_hostname=server_hostname, do_handshake_on_connect=False) + s.setblocking(False) + ss = Stream(s) + yield core._io_queue.queue_write(s) return ss, ss # Class representing a TCP stream server, can be closed and used in "async with" class Server: + # CIRCUITPY-CHANGE: doc """This represents the server class returned from `start_server`. It can be used in an ``async with`` statement to close the server upon exit. """ @@ -179,6 +207,7 @@ async def __aexit__(self, exc_type, exc, tb): await self.wait_closed() def close(self): + # CIRCUITPY-CHANGE: doc """Close the server.""" self.task.cancel() @@ -191,20 +220,35 @@ async def wait_closed(self): await self.task - async def _serve(self, s, cb): + async def _serve(self, s, cb, ssl): + self.state = False # Accept incoming connections while True: try: + # CIRCUITPY-CHANGE: await, not yield await core._io_queue.queue_read(s) - except core.CancelledError: - # Shutdown server + except core.CancelledError as er: + # The server task was cancelled, shutdown server and close socket. s.close() - return + if self.state: + # If the server was explicitly closed, ignore the cancellation. + return + else: + # Otherwise e.g. the parent task was cancelled, propagate + # cancellation. + raise er try: s2, addr = s.accept() except: # Ignore a failed accept continue + if ssl: + try: + s2 = ssl.wrap_socket(s2, server_side=True, do_handshake_on_connect=False) + except OSError as e: + core.sys.print_exception(e) + s2.close() + continue s2.setblocking(False) s2s = Stream(s2, {"peername": addr}) core.create_task(cb(s2s, s2s)) @@ -213,6 +257,7 @@ async def _serve(self, s, cb): # Helper function to start a TCP stream server, running as a new task # TODO could use an accept-callback on socket read activity instead of creating a task async def start_server(cb, host, port, backlog=5): + # CIRCUITPY-CHANGE: doc """Start a TCP server on the given *host* and *port*. The *cb* callback will be called with incoming, accepted connections, and be passed 2 arguments: reader writer streams for the connection. @@ -222,7 +267,7 @@ async def start_server(cb, host, port, backlog=5): This is a coroutine. """ - import usocket as socket + import socket # Create and bind server socket. host = socket.getaddrinfo(host, port)[0] # TODO this is blocking! @@ -234,7 +279,17 @@ async def start_server(cb, host, port, backlog=5): # Create and return server object and task. srv = Server() - srv.task = core.create_task(srv._serve(s, cb)) + srv.task = core.create_task(srv._serve(s, cb, ssl)) + try: + # Ensure that the _serve task has been scheduled so that it gets to + # handle cancellation. + await core.sleep_ms(0) + except core.CancelledError as er: + # If the parent task is cancelled during this first sleep, then + # we will leak the task and it will sit waiting for the socket, so + # cancel it. + srv.task.cancel() + raise er return srv diff --git a/asyncio/task.py b/asyncio/task.py index 2e3a6db..af7e700 100644 --- a/asyncio/task.py +++ b/asyncio/task.py @@ -1,3 +1,4 @@ +# CIRCUITPY-CHANGE: SPDX # SPDX-FileCopyrightText: 2019-2020 Damien P. George # # SPDX-License-Identifier: MIT @@ -5,15 +6,12 @@ # MicroPython uasyncio module # MIT license; Copyright (c) 2019-2020 Damien P. George # +# CIRCUITPY-CHANGE # This code comes from MicroPython, and has not been run through black or pylint there. # Altering these files significantly would make merging difficult, so we will not use # pylint or black. # pylint: skip-file # fmt: off -""" -Tasks -===== -""" # This file contains the core TaskQueue based on a pairing heap, and the core Task class. # They can optionally be replaced by C implementations. @@ -130,13 +128,14 @@ def pop(self): def remove(self, v): self.heap = ph_delete(self.heap, v) - # Compatibility aliases, remove after they are no longer used + # CIRCUITPY-CHANGE: Compatibility aliases, remove after 8.x is no longer supported push_head = push push_sorted = push pop_head = pop # Task class representing a coroutine, can be waited on and cancelled. class Task: + # CIRCUITPY-CHANGE: doc """This object wraps a coroutine into a running task. Tasks can be waited on using ``await task``, which will wait for the task to complete and return the return value of the task. @@ -166,11 +165,12 @@ def __iter__(self): raise RuntimeError("can't wait") return self - # CircuitPython needs __await()__. + # CICUITPY-CHANGE: CircuitPython needs __await()__. __await__ = __iter__ def __next__(self): if not self.state: + # CIRCUITPY-CHANGE if self.data is None: # Task finished but has already been sent to the loop's exception handler. raise StopIteration @@ -184,11 +184,13 @@ def __next__(self): core.cur_task.data = self def done(self): + # CIRCUITPY-CHANGE: doc """Whether the task is complete.""" return not self.state def cancel(self): + # CIRCUITPY-CHANGE: doc """Cancel the task by injecting a ``CancelledError`` into it. The task may or may not ignore this exception. """ diff --git a/asyncio/traceback.py b/asyncio/traceback.py index eaf62ea..f87f34e 100644 --- a/asyncio/traceback.py +++ b/asyncio/traceback.py @@ -1,12 +1,9 @@ -# SPDX-FileCopyrightText: 2019-2020 Damien P. George +# SPDX-FileCopyrightText: 2024 by Adafruit Industries # # SPDX-License-Identifier: MIT # -# MicroPython uasyncio module -# MIT license; Copyright (c) 2019-2020 Damien P. George -""" -Fallback traceback module if the system traceback is missing. -""" + +# Note: not present in MicroPython asyncio try: from typing import List From a94cdea31f85928f2afb061859ca6b345bbd2061 Mon Sep 17 00:00:00 2001 From: Dan Halbert Date: Fri, 16 Aug 2024 15:52:58 -0400 Subject: [PATCH 2/6] add get_running_loop(); remove ThreadSafeFlag --- asyncio/core.py | 13 +++++++++++++ asyncio/event.py | 30 +----------------------------- 2 files changed, 14 insertions(+), 29 deletions(-) diff --git a/asyncio/core.py b/asyncio/core.py index eaeef81..8a4eb1d 100644 --- a/asyncio/core.py +++ b/asyncio/core.py @@ -446,6 +446,19 @@ def get_event_loop(runq_len=0, waitq_len=0): return Loop +# CIRCUITPY-CHANGE: added, to match CPython +def get_running_loop(): + """Return the event loop used to schedule and run tasks. See `Loop`.""" + + return Loop + + +def get_event_loop(runq_len=0, waitq_len=0): + # CIRCUITPY-CHANGE: doc + """Return the event loop used to schedule and run tasks. See `Loop`. Deprecated and will be removed later.""" + + # CIRCUITPY-CHANGE + return get_running_loop() def current_task(): # CIRCUITPY-CHANGE: doc diff --git a/asyncio/event.py b/asyncio/event.py index 362d2e1..af81afc 100644 --- a/asyncio/event.py +++ b/asyncio/event.py @@ -72,32 +72,4 @@ async def wait(self): return True -# MicroPython-extension: This can be set from outside the asyncio event loop, -# such as other threads, IRQs or scheduler context. Implementation is a stream -# that asyncio will poll until a flag is set. -# Note: Unlike Event, this is self-clearing after a wait(). -try: - import io - - class ThreadSafeFlag(io.IOBase): - def __init__(self): - self.state = 0 - - def ioctl(self, req, flags): - if req == 3: # MP_STREAM_POLL - return self.state * flags - return -1 # Other requests are unsupported - - def set(self): - self.state = 1 - - def clear(self): - self.state = 0 - - async def wait(self): - if not self.state: - yield core._io_queue.queue_read(self) - self.state = 0 - -except ImportError: - pass +# CIRCUITPY: remove ThreadSafeFlag From bdb94fe0380dfed8cf42d9a827151f328920eb79 Mon Sep 17 00:00:00 2001 From: Dan Halbert Date: Fri, 16 Aug 2024 17:34:21 -0400 Subject: [PATCH 3/6] update pre-commit pylint version --- .pre-commit-config.yaml | 2 +- asyncio/event.py | 2 +- asyncio/funcs.py | 3 +-- asyncio/traceback.py | 2 ++ 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 70ade69..874bf5f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -18,7 +18,7 @@ repos: - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/pycqa/pylint - rev: v2.17.4 + rev: v3.2.6 hooks: - id: pylint name: pylint (library code) diff --git a/asyncio/event.py b/asyncio/event.py index af81afc..1cb1a30 100644 --- a/asyncio/event.py +++ b/asyncio/event.py @@ -72,4 +72,4 @@ async def wait(self): return True -# CIRCUITPY: remove ThreadSafeFlag +# CIRCUITPY: remove ThreadSafeFlag; non-standard extension. diff --git a/asyncio/funcs.py b/asyncio/funcs.py index 48d302d..61840d0 100644 --- a/asyncio/funcs.py +++ b/asyncio/funcs.py @@ -92,8 +92,7 @@ def remove(t): pass -# CIRCUITPY-CHANGE: async -async def gather(*aws, return_exceptions=False): +def gather(*aws, return_exceptions=False): # CIRCUITPY-CHANGE: doc """Run all *aws* awaitables concurrently. Any *aws* that are not tasks are promoted to tasks. diff --git a/asyncio/traceback.py b/asyncio/traceback.py index f87f34e..8e0386e 100644 --- a/asyncio/traceback.py +++ b/asyncio/traceback.py @@ -5,6 +5,8 @@ # Note: not present in MicroPython asyncio +"""CircuitPython-specific traceback support for asyncio.""" + try: from typing import List except ImportError: From cf028c6d6685a843d0de45010da743aa0f626337 Mon Sep 17 00:00:00 2001 From: Dan Halbert Date: Fri, 16 Aug 2024 22:23:00 -0400 Subject: [PATCH 4/6] restore gather as async to fix tests; restore proper await in gather --- asyncio/funcs.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/asyncio/funcs.py b/asyncio/funcs.py index 61840d0..e3c20c3 100644 --- a/asyncio/funcs.py +++ b/asyncio/funcs.py @@ -92,7 +92,8 @@ def remove(t): pass -def gather(*aws, return_exceptions=False): +# CIRCUITPY-CHANGE: async +async def gather(*aws, return_exceptions=False): # CIRCUITPY-CHANGE: doc """Run all *aws* awaitables concurrently. Any *aws* that are not tasks are promoted to tasks. @@ -153,7 +154,7 @@ def done(t, er): if state > 0: gather_task.data = _Remove try: - yield + await core._never() except core.CancelledError as er: cancel_all = True state = er From c6244bf058de4f178aa30a37be74b453dd56a483 Mon Sep 17 00:00:00 2001 From: Dan Halbert Date: Sat, 17 Aug 2024 10:26:44 -0400 Subject: [PATCH 5/6] don't use yield; touch up documentation --- asyncio/core.py | 8 +++++--- asyncio/event.py | 2 -- asyncio/funcs.py | 6 +++--- asyncio/lock.py | 2 -- asyncio/stream.py | 22 +++------------------- 5 files changed, 11 insertions(+), 29 deletions(-) diff --git a/asyncio/core.py b/asyncio/core.py index 8a4eb1d..2409d8f 100644 --- a/asyncio/core.py +++ b/asyncio/core.py @@ -96,7 +96,9 @@ def sleep_ms(t, sgen=SingletonGenerator()): # CIRCUITPY-CHANGE: doc """Sleep for *t* milliseconds. - This is a coroutine, and a MicroPython extension. + This is a MicroPython extension. + + Returns a coroutine. """ # CIRCUITPY-CHANGE: add debugging hint @@ -108,9 +110,9 @@ def sleep_ms(t, sgen=SingletonGenerator()): # Pause task execution for the given time (in seconds) def sleep(t): # CIRCUITPY-CHANGE: doc - """Sleep for *t* seconds + """Sleep for *t* seconds. - This is a coroutine. + Returns a coroutine. """ return sleep_ms(int(t * 1000)) diff --git a/asyncio/event.py b/asyncio/event.py index 1cb1a30..bd63cb9 100644 --- a/asyncio/event.py +++ b/asyncio/event.py @@ -57,8 +57,6 @@ async def wait(self): # CIRCUITPY-CHANGE: doc """Wait for the event to be set. If the event is already set then it returns immediately. - - This is a coroutine. """ if not self.state: diff --git a/asyncio/funcs.py b/asyncio/funcs.py index e3c20c3..582ffc2 100644 --- a/asyncio/funcs.py +++ b/asyncio/funcs.py @@ -42,8 +42,6 @@ async def wait_for(aw, timeout, sleep=core.sleep): this should be trapped by the caller. Returns the return value of *aw*. - - This is a coroutine. """ aw = core._promote_to_task(aw) @@ -80,7 +78,9 @@ def wait_for_ms(aw, timeout): # CIRCUITPY-CHANGE: doc """Similar to `wait_for` but *timeout* is an integer in milliseconds. - This is a coroutine, and a MicroPython extension. + This is a MicroPython extension. + + Returns a coroutine. """ return wait_for(aw, timeout, core.sleep_ms) diff --git a/asyncio/lock.py b/asyncio/lock.py index 6dc8bbc..2320ebb 100644 --- a/asyncio/lock.py +++ b/asyncio/lock.py @@ -67,8 +67,6 @@ async def acquire(self): # CIRCUITPY-CHANGE: doc """Wait for the lock to be in the unlocked state and then lock it in an atomic way. Only one task can acquire the lock at any one time. - - This is a coroutine. """ if self.state != 0: diff --git a/asyncio/stream.py b/asyncio/stream.py index 24523b6..0d2e249 100644 --- a/asyncio/stream.py +++ b/asyncio/stream.py @@ -43,8 +43,6 @@ def close(self): async def wait_closed(self): # CIRCUITPY-CHANGE: doc """Wait for the stream to close. - - This is a coroutine. """ # TODO yield? @@ -54,8 +52,6 @@ async def wait_closed(self): async def read(self, n): # CIRCUITPY-CHANGE: doc """Read up to *n* bytes and return them. - - This is a coroutine. """ await core._io_queue.queue_read(self.s) @@ -67,7 +63,7 @@ async def readinto(self, buf): Return the number of bytes read into *buf* - This is a coroutine, and a MicroPython extension. + This is a MicroPython extension. """ # CIRCUITPY-CHANGE: await, not yield @@ -81,9 +77,7 @@ async def readexactly(self, n): Raises an ``EOFError`` exception if the stream ends before reading *n* bytes. - - This is a coroutine. - """ + """ r = b"" while n: @@ -101,8 +95,6 @@ async def readexactly(self, n): async def readline(self): # CIRCUITPY-CHANGE: doc """Read a line and return it. - - This is a coroutine. """ l = b"" @@ -133,8 +125,6 @@ def write(self, buf): async def drain(self): # CIRCUITPY-CHANGE: doc """Drain (write) all buffered output data out to the stream. - - This is a coroutine. """ mv = memoryview(self.out_buf) @@ -162,8 +152,6 @@ async def open_connection(host, port, ssl=None, server_hostname=None): Returns a pair of streams: a reader and a writer stream. Will raise a socket-specific ``OSError`` if the host could not be resolved or if the connection could not be made. - - This is a coroutine. """ from uerrno import EINPROGRESS @@ -188,7 +176,7 @@ async def open_connection(host, port, ssl=None, server_hostname=None): s = ssl.wrap_socket(s, server_hostname=server_hostname, do_handshake_on_connect=False) s.setblocking(False) ss = Stream(s) - yield core._io_queue.queue_write(s) + await core._io_queue.queue_write(s) return ss, ss @@ -214,8 +202,6 @@ def close(self): async def wait_closed(self): """Wait for the server to close. - - This is a coroutine. """ await self.task @@ -263,8 +249,6 @@ async def start_server(cb, host, port, backlog=5): writer streams for the connection. Returns a `Server` object. - - This is a coroutine. """ import socket From 2ea2b06c7dcc41538790cc4cef5bc3e709b80762 Mon Sep 17 00:00:00 2001 From: Dan Halbert Date: Wed, 21 Aug 2024 16:45:04 -0400 Subject: [PATCH 6/6] fix comment typo --- asyncio/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asyncio/core.py b/asyncio/core.py index 2409d8f..8937eba 100644 --- a/asyncio/core.py +++ b/asyncio/core.py @@ -428,7 +428,7 @@ def default_exception_handler(loop, context): # CIRCUITPY-CHANGE: doc """The default exception handler that is called.""" - # CIRCUITPY_CHANGE: use CircuitPython traceback printing + # CIRCUITPY-CHANGE: use CircuitPython traceback printing exc = context["exception"] print_exception(None, exc, exc.__traceback__)