Skip to content

Conversation

@BakasuraRCE
Copy link
Contributor

@BakasuraRCE BakasuraRCE commented Nov 16, 2022

Issue number: #1708

Summary

Add new AsyncBatchProcessor and async_batch_processor decorator to support async batch processing for a customer's record handler.

Before

classDiagram
    class BasePartialProcessor {
        <<interface>>
        +success_handler(record, result) SuccessResponse
        +failure_handler(record, exception) FailureResponse
        +process()
        +_prepare()
        +_clean()
        +_process_record_(record)
        +_async_process() List~Tuple~
        +__enter__()
        +__exit__()
        +__call__(records, handler, lambda_context)
    }

    class BatchProcessor {
        +Dict batch_response
        +response() Dict
        +_process_record()
        +_prepare()
        +_clean()
    }

    BasePartialProcessor <|-- BatchProcessor : implement
    note for BatchProcessor "implements entire interface and logic for batch processing"
Loading

New

classDiagram
    class BasePartialProcessor {
        <<interface>>
        +success_handler(record, result) SuccessResponse
        +failure_handler(record, exception) FailureResponse
        +process()
        +_prepare()
        +_clean()
        +_process_record_(record)
        +_async_process_record_(record)
        +_async_process() List~Tuple~
        +__enter__()
        +__exit__()
        +__call__(records, handler, lambda_context)
    }

    class BasePartialBatchProcessor {
        +Dict batch_response
        +response() Dict
        +_prepare()
        +_clean()
    }

    class BatchProcessor {
        +_process_record()
        +_async_process_record() NotImplementedError
    }

    class AsyncBatchProcessor {
        +_process_record() NotImplementedError
        +_async_process_record()
    }

    BasePartialProcessor <|-- BasePartialBatchProcessor : implement
    BasePartialBatchProcessor <|-- BatchProcessor : inherit
    BasePartialBatchProcessor <|-- AsyncBatchProcessor : inherit

    note for BasePartialBatchProcessor "implements shared logic for batch processing used for sync and async processing"
    note for BatchProcessor "only implements logic to call customers' record handler synchronously"
    note for AsyncBatchProcessor "only implements logic to call customers' record handler asynchronously"
Loading

Changes

New

AsyncBatchProcessor class

New class to encapsulate async processing handlers by implementing _async_process_record interface. Same logic as BatchProcessor but async. It expects record handlers to be async functions now.

async_batch_processor decorator

New decorator that instantiates AsyncBatchProcessor, also sync to prevent forcing customers to make their Handlers async causing side effects to existing middlewares. AsyncBatchProcessor handles sync->async operation transparently via closures.

BasePartialBatchProcessor class

Contains shared logic for all processors be synchronous or asynchronous. By shared logic, this means success handler, failure handler, message ID collection from SQS, Kinesis, DynamoDB Streams, etc.

For a future major version, we can further decompose by creating more specialized classes, for example KinesisStreamsBatchProcessor. This will become more relevant when we support beyond Lambda runtime, where their logic and debugging will benefit from a clear separation.

Async methods and ABCs

BasePartialProcessor base class add a new method async_process and a new async ABC async_process_record().

  • def async_process(). Synchronous function with an async closure to process records asynchronously. Despite the name, its outer function is sync to prevent forcing customers to make their Handlers async, otherwise it'd impact existing middlewares we're unaware of.
  • async def _async_process_record(). Async version of _process_record abstract method

Major changes

BatchProcessor

We move all logic to BasePartialBatchProcessor to make it easier to split Sync vs Async processors.

Both processors BatchProcessor and AsyncBatchProcessor have the same logic, except one handles asyncIO when calling an async record handler.

For a future where we want to support a threaded version, we can create a specialized ThreadedBatchProcessor class, inherit BasePartialBatchProcessor, and implement _process_record.

User experience

Please share what the user experience looks like before and after this change

image

Checklist

If your change doesn't seem to apply, please leave them unchecked.

Is this a breaking change?

RFC issue number:

Checklist:

  • Migration process documented
  • Implement warnings (if it can live side by side)

Acknowledgment

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Disclaimer: We value your time and bandwidth. As such, any pull requests created on non-triaged issues might not be successful.

feat: implement async_lambda_handler decorator
@BakasuraRCE BakasuraRCE requested a review from a team as a code owner November 16, 2022 00:06
@BakasuraRCE BakasuraRCE requested review from rubenfonseca and removed request for a team November 16, 2022 00:06
@boring-cyborg boring-cyborg bot added the tests label Nov 16, 2022
@pull-request-size pull-request-size bot added the size/L Denotes a PR that changes 100-499 lines, ignoring generated files. label Nov 16, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Nov 16, 2022

Thanks a lot for your first contribution! Please check out our contributing guidelines and don't hesitate to ask whatever you need.
In the meantime, check out the #python channel on our AWS Lambda Powertools Discord: Invite link

@BakasuraRCE
Copy link
Contributor Author

BakasuraRCE commented Nov 16, 2022

@heitorlessa the code still is untested

I didn't know where to put the new asynchronous decorator, so I created some tests in asynchronous version, and examples, let me know your comments

@heitorlessa heitorlessa requested review from heitorlessa and removed request for rubenfonseca November 17, 2022 08:58
Copy link
Contributor

@heitorlessa heitorlessa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you once again for the heavy lifting - I like the overall direction! It even brought up that we should (separate PR) have a way to use a ThreadPoolExecutor as an opt-in feature like concurrent=True for the synchronous processor.

Major comments summarized:

  • Breaking change. Removing process() from BasePartialProcessor breaks custom processors. Turns out (TIL) we don't have a test for it hence why it wasn't caught up earlier.
  • Let's keep Lambda handler synchronous. We can prevent a call stack slowdown, make async_lambda_handler redundant, and prevent a class of unforeseen issues for customers stacking decorators of different colours (sync/async) since ordering matters.
  • Single inheritance only. As I mentioned earlier, we can't use multiple inheritance as that will impact our ability to use Mypyc later.

@codecov-commenter
Copy link

codecov-commenter commented Nov 21, 2022

Codecov Report

Base: 97.52% // Head: 97.44% // Decreases project coverage by -0.08% ⚠️

Coverage data is based on head (0a79aac) compared to base (bf2558a).
Patch coverage: 92.30% of modified lines in pull request are covered.

Additional details and impacted files
@@             Coverage Diff             @@
##           develop    #1724      +/-   ##
===========================================
- Coverage    97.52%   97.44%   -0.08%     
===========================================
  Files          143      143              
  Lines         6573     6606      +33     
  Branches       468      471       +3     
===========================================
+ Hits          6410     6437      +27     
- Misses         128      132       +4     
- Partials        35       37       +2     
Impacted Files Coverage Δ
aws_lambda_powertools/utilities/batch/__init__.py 100.00% <ø> (ø)
aws_lambda_powertools/utilities/batch/base.py 94.67% <92.30%> (-3.12%) ⬇️

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

@heitorlessa heitorlessa changed the title feat: implement AsyncBatchProcessor feat(batch): add AsyncBatchProcessor for concurrent processing Nov 23, 2022
@heitorlessa heitorlessa changed the title feat(batch): add AsyncBatchProcessor for concurrent processing feat(batch): add async_batch_processor for concurrent processing Nov 23, 2022
@github-actions github-actions bot added the feature New feature or functionality label Nov 23, 2022
fix: remove async_lambda_handler
@BakasuraRCE
Copy link
Contributor Author

Thank you once again for the heavy lifting - I like the overall direction! It even brought up that we should (separate PR) have a way to use a ThreadPoolExecutor as an opt-in feature like concurrent=True for the synchronous processor.

Major comments summarized:

* **Breaking change**. Removing `process()` from `BasePartialProcessor` breaks [custom processors](https://awslabs.github.io/aws-lambda-powertools-python/2.3.1/utilities/batch/#create-your-own-partial-processor). Turns out (TIL) we don't have a test for it hence why it wasn't caught up earlier.

* **Let's keep Lambda handler synchronous**. We can prevent a call stack slowdown, make `async_lambda_handler` redundant, and prevent a class of unforeseen issues for customers stacking decorators of different colours (sync/async) since ordering matters.

* **Single inheritance only**. As I mentioned earlier, we can't use multiple inheritance as that will impact our ability to use Mypyc later.

Hey!

  • Make the simple inheritance code forces to do things not so beautiful, such as loading the methods _async_process_record and async_process to the class BasePartialProcessor.
  • So, I changed the asyncio.run() in async_batch_processor, I had BIG problems with that in another project.
  • About the concurrent=True, following the best practices, it is best to have "atomic responsibilities" for methods/classes. It would be better to get a new class called ThreadBatchProcessor class with its corresponding thread_batch_processor, keeping this explicit, also solve the tend to confuse asynchrony with the concurrence

@heitorlessa
Copy link
Contributor

I'm glad you called it out - thank you for keeping us honest ;)

Answering them

Make the simple inheritance code forces to do things not so beautiful

I couldn't agree more. Because we optimize to run in a constrained environment (almost like embedded), we have to make these decisions which would be sub-optimal in different contexts. We're trading a more beautiful implementation with future speed gains that could save $ for customers.

So, I changed the asyncio.run() in async_batch_processor, I had BIG problems with that in another project.

Mind sharing what the big problems were so we can learn from it? I'm new to asyncio in Python from a library authorship point of view - if there's a balance we can strike to avoid decorator stacking confusion and increased call stack while avoiding these scars you've earned, it'd be lovely!

  • About the concurrent=True, following the best practices, it is best to have "atomic responsibilities" for methods/classes.

I worded that poorly. What I meant for concurrent=True was an UX thinking, I didn't put any thought on the underlying implementation at the time of writing. You're absolutely right on ThreadBatchProcessor - way to go, as threading can get complex pretty quickly.

Since batch_processor decorator would simply call ThreadBatchProcessor.process() where the threading logic is, and it's also sync, I find thread_batch_processor mostly cognitive load when balancing semantics, intent and non-developers using it -- am I missing something here?

@BakasuraRCE
Copy link
Contributor Author

Mind sharing what the big problems were so we can learn from it? I'm new to asyncio in Python from a library authorship point of view - if there's a balance we can strike to avoid decorator stacking confusion and increased call stack while avoiding these scars you've earned, it'd be lovely!

It is something that I still can't explain well, I have worked for a good time with this function in docker environments without memory or disk limitations and always following the documentation: It should be used as a main entry point for asyncio programs, and should ideally only be called once.

However, in the AWS Lambda environment, even with high limits, it simply fails when for some reason an unexpected error occurs(prematurely closing the main loop without allowing the error to spread up).

Imagine an error 423 in an API or an error 400 in the boto3 behind an asynchronous wrapper with a thread underneath. In non-Lambda environments, the error is simply propagated up, and then the loop closes without complaints.

However, python's recommendation is to use asyncio.run instead of deep functions to create the main loop.

Since batch_processor decorator would simply call ThreadBatchProcessor.process() where the threading logic is, and it's also sync, I find thread_batch_processor mostly cognitive load when balancing semantics, intent and non-developers using it -- am I missing something here?

Exactly, it is a matter of being explicit and not giving rise to confusion, it is also due to maintenance of the code, imagine in the future for some reason processing messages using the class ThreadBatchProcessor, it is not simply calling to ThreadBatchProcessor.process().

The batch_processor would have to be modified in consequence to have synchronous logic and a multithread logic, this could be confusing to maintain, especially not intuitive.

I'm sorry for not having a more technical detail about asyncio, I wait for your thoughts :)

@heitorlessa
Copy link
Contributor

Aaah, that's highly likely due to the nature of container/process freezing in Lambda. There are similar side effects in NodeJS too.

Agreed on the unforeseeable nature of thread_batch_processor needing custom logic in the future. Let's go ahead with these.

I'm going to re:Invent today and will be mostly away for a week, feel free to reach out to @rubenfonseca for anything.

We can help write the docs, do a thorough review, and contribute any possible gaps to make this quicker.

Thank you so much again for helping out with this (Gracias!!)

Copy link
Contributor

@heitorlessa heitorlessa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last comments before winter holidays.

In summary:

  • We need a summary of the changes in the PR body. This helps future maintainers/contributors look up the reason for certain changes and for reviewer's understanding
  • Made comments on doc strings. It will likely be foreign to other maintainers on the async closure so a more complete docstring helps remove ambiguity.
  • We need a more realistic async batch example. This will ease writing docs and better emphasize the usefulness of this feature for other customers ;)
  • [Optional] We need a test to prevent custom processor regression. Earlier we manually caught that custom processors use methods that were removed as part of the initial PR. I'm pretty sure we will get sidetracked and not add after this PR is merged, so let's add it - whoever has the bandwidth in January.

Thank you so much again

from aws_lambda_powertools.utilities.typing import LambdaContext


async def async_record_handler(record: SQSRecord):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np: what would be a realistic example here? maybe an async crawler?

As part of docs refactoring, we haven't reached Batch yet, we're trying to include more complete examples.

Example: https://awslabs.github.io/aws-lambda-powertools-python/2.4.0/core/tracer/#ignoring-certain-http-endpoints

BakasuraRCE and others added 2 commits December 15, 2022 19:51
Co-authored-by: Heitor Lessa <lessa@amazon.nl>
Signed-off-by: Bakasura <bakasura@protonmail.ch>
@rubenfonseca rubenfonseca self-assigned this Jan 16, 2023
@heitorlessa
Copy link
Contributor

Back from holidays and our team offsite ends this week. I'll resume this PR next week so we can get it merged for our next release ;-)

Hope you had a great end of the year, and look forward to getting this to the finish line!

Thank you for the prolonged patience

heitorlessa and others added 3 commits January 31, 2023 14:31
* develop: (24 commits)
  chore(deps): bump docker/setup-buildx-action from 2.4.0 to 2.4.1 (aws-powertools#1903)
  chore(deps-dev): bump aws-cdk-lib from 2.63.0 to 2.63.2 (aws-powertools#1904)
  update changelog with latest changes
  docs(idempotency): add IAM permissions section (aws-powertools#1902)
  chore(deps-dev): bump mkdocs-material from 9.0.10 to 9.0.11 (aws-powertools#1896)
  chore(deps-dev): bump mypy-boto3-appconfig from 1.26.0.post1 to 1.26.63 (aws-powertools#1895)
  chore(maintainers): fix release workflow rename
  update changelog with latest changes
  docs(homepage): set url for end-of-support in announce block (aws-powertools#1893)
  chore(deps-dev): bump mkdocs-material from 9.0.9 to 9.0.10 (aws-powertools#1888)
  chore(deps-dev): bump mypy-boto3-s3 from 1.26.58 to 1.26.62 (aws-powertools#1889)
  chore(deps-dev): bump black from 22.12.0 to 23.1.0 (aws-powertools#1886)
  chore(deps-dev): bump aws-cdk-lib from 2.62.2 to 2.63.0 (aws-powertools#1887)
  update changelog with latest changes
  feat(metrics): add default_dimensions to single_metric (aws-powertools#1880)
  chore: update v2 layer ARN on documentation
  bump version to 2.7.1
  update changelog with latest changes
  docs(homepage): add banner for end-of-support v1 (aws-powertools#1879)
  fix(license): correction to MIT + MIT-0 (no proprietary anymore) (aws-powertools#1883)
  ...
@heitorlessa heitorlessa linked an issue Feb 7, 2023 that may be closed by this pull request
2 tasks
@heitorlessa
Copy link
Contributor

Pushed some changes to improve maintenance. Next: I'll create a summary of the changes in the PR before refactoring the doc examples, and create docs for this. If nothing else drags me over meetings and such, this should be in the next release 2.8.0 ;)

@heitorlessa
Copy link
Contributor

Pull request description changed to reflect all changes, and to make it easier for maintainers to reason what, why, and the effect of these changes.

Added two mermaid diagrams to more easily grasp it as a summary


Before

classDiagram
    class BasePartialProcessor {
        <<interface>>
        +success_handler(record, result) SuccessResponse
        +failure_handler(record, exception) FailureResponse
        +process()
        +_prepare()
        +_clean()
        +_process_record_(record)
        +_async_process() List~Tuple~
        +__enter__()
        +__exit__()
        +__call__(records, handler, lambda_context)
    }

    class BatchProcessor {
        +Dict batch_response
        +response() Dict
        +_process_record()
        +_prepare()
        +_clean()
    }

    BasePartialProcessor <|-- BatchProcessor : implement
    note for BatchProcessor "implements entire interface and logic for batch processing"
Loading

New

classDiagram
    class BasePartialProcessor {
        <<interface>>
        +success_handler(record, result) SuccessResponse
        +failure_handler(record, exception) FailureResponse
        +process()
        +_prepare()
        +_clean()
        +_process_record_(record)
        +_async_process_record_(record)
        +_async_process() List~Tuple~
        +__enter__()
        +__exit__()
        +__call__(records, handler, lambda_context)
    }

    class BasePartialBatchProcessor {
        +Dict batch_response
        +response() Dict
        +_prepare()
        +_clean()
    }

    class BatchProcessor {
        +_process_record()
        +_async_process_record() NotImplementedError
    }

    class AsyncBatchProcessor {
        +_process_record() NotImplementedError
        +_async_process_record()
    }

    BasePartialProcessor <|-- BasePartialBatchProcessor : implement
    BasePartialBatchProcessor <|-- BatchProcessor : inherit
    BasePartialBatchProcessor <|-- AsyncBatchProcessor : inherit

    note for BasePartialBatchProcessor "implements shared logic for batch processing used for sync and async processing"
    note for BatchProcessor "only implements logic to call customers' record handler synchronously"
    note for AsyncBatchProcessor "only implements logic to call customers' record handler asynchronously"
Loading

@pull-request-size pull-request-size bot removed the size/L Denotes a PR that changes 100-499 lines, ignoring generated files. label Feb 7, 2023
@boring-cyborg boring-cyborg bot added the documentation Improvements or additions to documentation label Feb 7, 2023
@pull-request-size pull-request-size bot added the size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. label Feb 7, 2023
@boring-cyborg boring-cyborg bot added the dependencies Pull requests that update a dependency file label Feb 7, 2023
@heitorlessa
Copy link
Contributor

ALL DONE 🎉 🚀 @BakasuraRCE -- I'll wait until Thursday for any last comments before we merge and push a new release ;)

I wanted to thank you one more time for this big effort. Since you last touched, I've made the following changes:

  • Documented API changes (new/refactoring) in the PR description
  • Removed the sync code snippet, and refactored the async code snippet to better show case this change
  • Created a new section in the docs, along with warnings when used with Tracer, and guidance for those new to AsyncIO
  • Fixed docstrings in classes (BatchProcessor and AsyncBatchProcessor) to ensure VSCode/PyCharm play nice when auto-completing docs

@sthulb
Copy link
Contributor

sthulb commented Feb 9, 2023

@BakasuraRCE Hey, I'm looking into writing a load test for this today/tomorrow to check perf :)

@sthulb
Copy link
Contributor

sthulb commented Feb 9, 2023

I created a simple load test. I created a SQS queue containing 5500 messages. When all of the messages were in the queue, I set the event source of my lambda to enabled. My event source had the following config:

  • Concurrency: 2
  • Batch size: 10000
  • Max window: 60 secs

I ran two versions of the function (sync and async using the code in this PR). The sync version took on average 1.2 seconds, the async version took around 300ms. It’s a vast improvement.

The function setup:

  • Memory: 512
  • Timeout: 10 secs

@leandrodamascena leandrodamascena merged commit 32d11e0 into aws-powertools:develop Feb 10, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Feb 10, 2023

Awesome work, congrats on your first merged pull request and thank you for helping improve everyone's experience!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dependencies Pull requests that update a dependency file documentation Improvements or additions to documentation feature New feature or functionality size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature request: Async BatchProcessor (use case: slow processing of each item)

6 participants