Add asyncio support#359
Conversation
fix typo
Handle timeout in aread_response
Merge from upstream
* Comment annotation fixups * Added SdoServer aupload and adownload * Fix missing Network.is_async() uses * Fix workaround in pdo.map.save/asave
|
What is the best way to deal with var = param.data # Non-async use
var = await param.aget_data() # Async useMy goal has been to keep the async and non-async uses of canopen as equal as possible, but this is an area where users will see a difference. |
|
There is an implementation for |
|
@acolomb I'm not precisely sure what you mean, so let me guess: You need separate |
|
Sorry, I was trying to answer your question in the previous comment about synchronous getters / setters used in the properties. What I meant was that instead we already do have methods to access a variable remotely. Those might be a better fit to mirror to the async world: value = sdo_var.raw # This is the usual, terse style recommended in the docs
value = sdo_var.read(fmt='raw') # This also works already
value = await sdo_var.aread(fmt='raw') # Feels like a natural enhancement for asyncNote that the |
|
Codecov ReportAttention: Patch coverage is
❗ Your organization needs to install the Codecov GitHub app to enable full functionality. Additional details and impacted files@@ Coverage Diff @@
## master #359 +/- ##
==========================================
- Coverage 71.36% 69.03% -2.34%
==========================================
Files 26 27 +1
Lines 3129 3397 +268
Branches 480 523 +43
==========================================
+ Hits 2233 2345 +112
- Misses 765 906 +141
- Partials 131 146 +15
|
* Thread dependent sentinel guard
* All callbacks are synchronous and same in both sync and async mode * Sync waiting is done with `asyncio.to_thread()` from async
Codecov ReportAttention: Patch coverage is
📢 Thoughts on this report? Let us know! |
* Make code more similar the upstream code * Implement missing async functions * Update README.rst and example * Revert test cases to be diffable with upstream * Ensure all skipTest() have useful messages * Wash FIXMEs and NOTEs
* Adding `AllowBlocking` for temporary pausing the async guard * skipTest() cleanup * Increase test coverage
* OD object lookup issue * SDO testing warning issue
* Fixed uncovered bugs * Bumped minimum py version to 3.9 (due to asyncio compatibility) * Added tests for PDO to increase coverage
|
This branch is ready for review and integration into the project. Its not fully complete (I suppose it never will). The biggest lack is that there is no documentation for async. The branch's README (https://github.com/sveinse/canopen-asyncio?tab=readme-ov-file#difference-between-async-and-non-async-version) contains an overview over the current differences between this branch and the main project. The port is quite "naive" async-wise as it relies on |
acolomb
left a comment
There was a problem hiding this comment.
Phew, that's a lot of material and I hope you don't mind my elaborate review comments. But first, let's take a step back and see if we can better define what the PR does and what the goals are.
I'm not really experienced with asyncio based programming in Python, but I do know pretty well what an event loop and coroutines are. So my main mental challenge with this is trying to imagine what an application would really look like if using asyncio to handle its functionality elegantly. From own experience, I'd say moving long-blocking background activities from threads to a (cooperatively scheduled) task is one of the main motivations -- such as waiting for a drive homing procedure to finish.
My impression on what this focuses on, with some overall remarks about the implementation:
-
Finding and marking all API functions potentially blocking on I/O. This is a useful thing in itself, but I don't think we want the NOTE comments in the code indefinitely. It makes sense to gather findings like you did now, but provides little extra value to the library consumer and will bit-rot really fast I suppose.
-
Detecting improper use of blocking functions in async context, which would block the main loop. This is useful but totally optional IMHO, so could better be moved to its own PR to focus on the actual async capabilities first. An approach with a single
assert canopen.async_guard.ensure_not_asyncline (or similar) could be just as good as the decorator approach, but clearer. -
Arranging user callbacks to be scheduled in the event loop instead of called immediately. I see several alternatives to the current dispatcher solution, so let's clarify what situation this applies to: A CAN message arrives on the network and the application needs to passively react to this external trigger. The reaction may be an internal callback (as in p402.py) or an arbitrarily long-running external application function. This uncertainty must be contained to not block the main event loop. Traditionally, it is executed inside the RX thread of python-can's
Notifieranyway, blocking that at worst. There is already functionality in there to schedule callbacks as tasks if they are coroutines, execute directly otherwise. So the easiest would be for the application to provide a coroutine directly as callback -- this just needs to be passed through the canopen library, so that the listener is also an async / await wrapper around the application callback. But shouldn't it be an application responsibility to decide whether the callback happens inside the RX background thread, or as a task on the event loop? -
Provide async (coroutine) variants of common blocking operation methods. This is the core of the "async integration" API-wise. But it's actually much less of a concern than the callback handling. Because the caller can always just do as you did in many places, calling
await asyncio.to_thread()on the existing synchronous function. Some of the really interesting functions are not converted to async variants yet. I suppose thatto_thread()approach is a stop-gap solution before implementing real, awaitable internal methods. -
Add unit tests for the new API. This is probably the largest portion of the diff, but I haven't checked it thoroughly as these high-level review points must be discussed and solved first.
Sorry if I missed something now, or it's just my lack of understanding asyncio in general. Plus I get too tired when tackling such a complex and large review late at night. But that's already lots of food for discussion I suppose.
| NOTIFIER_SHUTDOWN_TIMEOUT: float = 5.0 #: Maximum waiting time to stop notifiers. | ||
|
|
||
| def __init__(self, bus: Optional[can.BusABC] = None): | ||
| # NOTE: Function arguments changed to provide notifier, see #556 |
There was a problem hiding this comment.
As discussed in #556, this can simply be set between __init__() and connect(), with the latter only creating one if not already set (as you did).
| self.bus: Optional[can.BusABC] = bus | ||
| self.loop: Optional[asyncio.AbstractEventLoop] = loop |
There was a problem hiding this comment.
The type hints here are inferred from the arguments, better skip repeating them.
| # Register this function as the means to check if canopen is run in | ||
| # async mode. This enables the @ensure_not_async() decorator to | ||
| # work. See async_guard.py | ||
| set_async_sentinel(self.is_async()) |
There was a problem hiding this comment.
This is not "registering a function", but simply flipping a switch concerning the current thread ID, right? The comment should be adapted, maybe leftover from a previous design?
| logger.error("An error has caused receiving of messages to stop") | ||
| raise exc | ||
|
|
||
| def is_async(self) -> bool: |
| # Exceptions in any callbaks should not affect CAN processing | ||
| logger.exception("Exception in callback: %s", exc_info=exc) | ||
|
|
||
| def dispatch_callbacks(self, callbacks: List[Callback], *args) -> None: |
There was a problem hiding this comment.
This is one of the central elements it seems, making it easy to influence what exactly "invoking a callback" means and does. The approach is alright and the Network seems like a good place to handle such a definition centrally.
However, I'm unsure whether this is the best / only solution regarding callbacks. One alternative that springs to mind is to wrap each given callback function when it is registered somewhere, replacing it with a callable that creates the corresponding task instead of executing the given callback directly.
| * The mechanism for CAN bus callbacks have been changed. Callbacks might be | ||
| async, which means they cannot be called immediately. This affects how | ||
| error handling is done in the library. |
There was a problem hiding this comment.
This sounds a lot like duplicating what python-can already provides. Can we embrace that lower-level concept more instead of building our own dispatcher?
| * :code:`EmcyConsumer.on_emcy` | ||
| * :code:`NtmMaster.on_heartbaet` | ||
|
|
||
| * SDO block upload and download is not yet supported in async mode. |
There was a problem hiding this comment.
Which would be a really good fit for a coroutine I suppose, if done right :-)
| * :code:`ODVariable.__len__()` returns 64 bits instead of 8 bits to support | ||
| truncated 24-bits integers, see #436 | ||
|
|
||
| * :code:`BaseNode402` does not work with async |
There was a problem hiding this comment.
I can imagine how a homing procedure for example would benefit from being called with await.
| """ | ||
| return self._node.get_data(index, subindex) | ||
|
|
||
| async def aupload(self, index: int, subindex: int) -> bytes: |
There was a problem hiding this comment.
Since there is no blocking access, we don't really need the async method variants on the SdoServer (nor SdoBase for that matter). Unless LocalNode.get_data() is also allowed to be a blocking access pattern, implemented as a coroutine itself, there is no value in wrapping it here like this. The only expected blocking I/O on the library side is the SdoClient, thus the async methods can be introduced only there.
| def read_generator(self): | ||
| """Generator to run through steps for reading the PDO configuration |
There was a problem hiding this comment.
I like this solution. Maybe a similar pattern can be applied in other places?
This PR adds support of asyncio to canopen. The overall goals is to make canopen able to be used in either with asyncio or regular synchronous mode (but not at the same time) from the same code base.
Note that this work is still work in progress. This PR was created to discuss the specific solutions for async and non-async as mentioned in #272. This PR closes #272.
Current status until feature complete:
ImplementNot neededABlockUploadStream,ABlockDownloadStreamandATextIOWrapperfor async inSdoClient.EcmyConsumer.wait()for asyncAsync implementation ofOnlyLssMasterfast_scanBaseNode402~Omitted for nowNetwork.add_node()