execlib.router module

Router

exception execlib.router.CallbackTimeoutError[source]

Bases: Exception

exception execlib.router.CancelledFrameError[source]

Bases: Exception

class execlib.router.ChainRouter(ordered_routers)[source]

Bases: Router, Generic

Routes events to registered callbacks

__init__(ordered_routers)[source]
Parameters:
  • loop

  • workers – number of workers to assign the thread pool when the event loop is started. Defaults to None, which, when passed to ThreadPoolExecutor, will by default use 5x the number of available processors on the machine (which the docs claim is a reasonable assumption given threads are more commonly leveraged for I/O work rather than intense CPU operations). Given the intended context for this class, this assumption aligns appropriately.

add_router(router)[source]

TODO: allow positional insertion in ordered list

Note

the routemap extensions here shouldn’t be necessary, since 1) route maps show up only in matching_routes, and 2) matching_routes is only invoked in submit_event, which is totally overwritten for the ChainRouter type. All events are routed through to individual Routers, and which point their route maps are used.

get_listener(listener_cls=None)[source]

Create a new Listener to manage watched routes and their callbacks.

matching_routes(event, event_time=None)[source]

Colloquial callbacks now used as a dict of lists of callbacks, indexed by router, and only having keys for routers with non-empty callback lists.

queue_callbacks(event_idx, callbacks)[source]

Overridable by inheriting classes based on callback structure

shutdown()[source]
stop_event(event)[source]

Sub-routers do not get a “done” callback for their submit_event jobs, as they would if they handled their own event submissions. They will, however, set the submitted event as “running.” We can’t rely on sub-routers’ “done” callbacks to “unset” the running event, because the disconnect between the thread completing and execution of that callback may take too long.

Instead, we explicitly unset the running event for each of the constituent sub-routers at the same time we handle the ChainRouter’s notion of event’s ending.

wait_on_event_callbacks(event, callbacks, timeouts=None, conditions=None)[source]

Returns a dictionary mapping from

Note: relies on order of callback-associated dicts matching that of ordered_routers, which should happen in matching_routes.

This method blurs the OOP lines a bit, as we’re actually passing dicts rather than the lists expected by the super class. The parent submit_event is carefully designed to be indifferent; use caution when making changes.

class execlib.router.FrameDirective(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Indicates frame-level behavior when a callback fails.

CANCEL_FRAME = 2
CONTINUE_WITHOUT = 1
class execlib.router.RouteRegistryMeta(name, bases, attrs)[source]

Bases: type

Metaclass handling route registry at the class level.

class execlib.router.Router(loop=None, workers=None)[source]

Bases: Generic

Route events to registered callbacks

Note

Generalized registration includes an endpoint (the origin of an event), a pattern (to filter events at the endpoint), and a callback (to be executed if pattern is matched).

The Router _routes_ events to affiliated callbacks in a multi-threaded fashion. A thread pool handles these jobs as events are submitted, typically by a composing Listener. The Listener “hears” an event, and passes it on through to a Router to further filter and delegate any matching follow-up jobs.

This base Router implements most of the registry and filter model. When events are submitted for propagation, they are checked for matching routes. Events specify an origin endpoint, which is used as the filter for attached routes. The event is then subjected to the filter method, which checks if the event matches the registered pattern under the originated endpoint. If so, the callback is scheduled for execution, and the matching event is passed as its sole argument.

Subclasses are expected to implement (at least) the filter method. This function is responsible for wrapping up the task-specific logic needed to determine if an event, originating from a known endpoint, matches the callback-specific pattern. This method needn’t handle any other filter logic, like checking if the event originates from the provided endpoint, as this is already handled by the outer look in matching_routes.

get_listener is a convenience method that instantiates and populates an affiliated Listener over the register paths found in the Router. Listeners require a Router upon instantiation so events can be propagated to available targets when they occur. get_listener() is the recommended way to attain a Listener.

on debouncing events

Previously, debouncing was handled by listeners. This logic has been generalized and moved to this class, as it’s general enough to be desired across various Listener types. We also need unique, identifying info only available with a (endpoint, callback, pattern) triple in order to debounce events in accordance with their intended target.

tracking events and serializing callback frames

Although not part of the original implementation, we now track which events have a callback chain actively being executed, and prevent the same chain from being started concurrently. If the callback chain is actively running for an event, and that same event is submitted before this chain finishes, the request is simply enqueued. The clear_event method is attached as a “done callback” to each job future, and will re-submit the event once the active chain finishes.

While this could be interpreted as a harsh design choice, it helps prevent many many thread conflicts (race conditions, writing to the same resources, etc) when the same function is executed concurrently, many times over. Without waiting completely for an event to be fully handled, later jobs may complete before earlier ones, or interact with intermediate disk states (raw file writes, DB inserts, etc), before the earliest call has had a chance to clean up.

Details behind the threaded model and future management

Routers kick off execution in response to events. These events are received via .submit, and the following process is kick-started:

  1. Each event is wrapped in its own .submit_event call and submitted as a task to the primary thread pool. Let $E$ be the set of events, and $|E|$ be the number of events. .submit exits as soon as these $|E|$ tasks are enqueued, not waiting for the completion of the corresponding worker threads. There are now $|E|$ tier-I tasks waiting to be started by the router’s primary thread pool, with states set to “pending.”

  2. The primary thread pool begins running the enqueued .submit_event calls concurrently using allocated resources (e.g., four threads). Each .submit_event call matches the associated event $e$ to $c_e$ callbacks, according to the registered routes. These callbacks are each individually submitted to the secondary thread pool as tier-II tasks and waited upon within the .submit_event call. This thread pool separation prevents deadlocks which would otherwise be an issue if submitting both tier-I and tier-II tasks to the same thread pool. Tier-I tasks that have begun this process are in the “running” state, and the submitted tier-II futures are now “pending.”

  3. Once the $c_e$ callbacks for event $e$ are completed (which are tier-II tasks being waited upon within a tier-I task), their done-callbacks will be invoked within “a thread belonging to the process that added them” (with wait_on_event_callbacks calling through to submit_callback, which attaches general_task_done). Where these done callbacks are executed varies based on a few conditions. See

    https://stackoverflow.com/a/26021772/4573915

    for a great breakdown on this. The gist is the following:

    1. If the callback is attached to a future that is already cancelled or completed, it will be invoked immediately in the current thread doing the attaching.

    2. If the future is queued/pending and successfully cancelled, the thread doing the cancelling will immediately invoke all of the future’s callbacks.

    3. Otherwise, the thread that executes the future’s task (could produce either a successful result or an exception) will invoke the callbacks.

    So if a task completes (i.e., is not cancelled, producing either a result or an exception), the thread that ran the task will also handle the associated callbacks. If the task is successfully cancelled (which means it was never running and never allocated a thread), the cancelling context will handle the callbacks, and this happens here only in .shutdown() with the call thread_pool.shutdown(cancel_futures=True).

    The results of these futures are made available in this submit_event context. Note that these results are dictated by the logic in wait_on_futures. If a future was successfully cancelled or raised and exception during execution, it will not have a result to add to this list.

    The router-level post-callbacks are then submitted to the secondary thread pool and awaited in a similar fashion to the individual $c_e$ callbacks. The results obtained from the event callbacks are passed through to these “post-callbacks” for possible centralized processing.

  4. Once all post-callbacks have completed (along with their attached “done-callbacks,” which are just .general_task_done checks handled in the executor thread that ran the post-callback), finally the tier-I .submit_event future can be marked completed (either with a successfully attached result or an exception), and its attached “done-callbacks” will be ran the in the same tier-I thread that handled the task (which is general_task_done and clear_event, in that order).

  • General behaviors and additional remarks: * Thread pool futures can only be cancelled prior to “running” or “done” states.

    Both successful cancellation or completion trigger a future’s done-callbacks, which will be executed in one a few possible contexts depending several conditions, as detailed above.

    • Tier-I tasks have clear_event attached as a done-callback, which tracks the task result and resubmits the event if valid (successfully debounced) repeat requests were received while the event has been handled.

    • Both tier-I and tier-II callbacks have a .general_task_done callback, which attempts to retrieve the future result if it wasn’t cancelled (if it was, this retrieval would raise a CancelledError). If it wasn’t cancelled but an exception was raised during execution, this same exception will be re-raised and re-caught, logged as an error, and exit “cleanly” (since job failures shouldn’t throw off the entire process). A successful result retrieval will have no effect.

  • On interrupts, exception handling, and future cancellation: *

__init__(loop=None, workers=None)[source]
Parameters:
  • loop

  • workers – number of workers to assign the thread pool when the event loop is started. Defaults to None, which, when passed to ThreadPoolExecutor, will by default use 5x the number of available processors on the machine (which the docs claim is a reasonable assumption given threads are more commonly leveraged for I/O work rather than intense CPU operations). Given the intended context for this class, this assumption aligns appropriately.

add_post_callback(callback)[source]
clear_event(event, future)[source]

Clear an event. Pops the passed event out of running_events, and if the request counter is >0, the event is re-submitted.

This method is attached as a “done” callback to the main event wrapping job submit_event. The future given to this method is one to which it was attached as this “done” callback. This method should only be called when that future is finished running (or failed). If any jobs were submitted in the wrapper task, the future results here should be non-empty (even if the methods don’t return anything; we’ll at least have [None,...] if we scheduled at least one callback). We use this fact to filter out non-work threads that call this method. Because even the matching_routes check is threaded, we can’t wait to see an event has no work to schedule, and thus can’t prevent this method being attached as a “done” callback. The check for results from the passed future allows us to know when in fact a valid frame has finished, and a resubmission may be on the table.

Why we don’t need to worry about resubmission w/ empty results

Note that, even though we can’t check for whether there will be any matching routes prior to calling submit_event (we have to wait until we’re in that method, at which point this method will already be attached as a callback), the event will never be marked as “running” and added to running_events. This means that we won’t queue up any callbacks for the same event while waiting on it, and then exit early here, never to re-submit. The worry was that a event response might match 0 callbacks (e.g., due to debouncing) and return [] from submit_event, but while waiting for this to complete, the same event is submitted and matches (e.g., debouncing timers now allowing the event through). This could mean that the repeat event gets queued behind the event in running_events and should be resubmitted here as a “queued callback,” but won’t do so because we exit early if no results are obtained. This is not an issue because the original event (that didn’t match any callbacks) will never be marked as running, and thus never prevent the second event from itself running if in fact in matches a non-empty callback set. This means that an empty future result set seen here indicates both 1) that no work took place, and 2) no conflicting events were prevented from running, and we can exit early here.

event_index(event)[source]
extend_listener(listener)[source]

Extend a provided Listener object with the Router instance’s listener_kwargs.

filter(event, pattern, **listen_kwargs)[source]

Determine if a given event matches the provided pattern

Parameters:
  • event (TypeVar(E, bound= Event))

  • pattern

  • listen_kwargs

Return type:

bool

general_callback(future)[source]
get_delayed_callback(callback, delay, index)[source]
Parameters:
  • callback (Callable) – function to wrap

  • delay (int | float) – delay in ms

get_listener(listener_cls=None)[source]

Create a new Listener to manage watched routes and their callbacks.

listener_cls

alias of Listener

matching_routes(event, event_time=None)[source]

Return eligible matching routes for the provided event.

Note that we wait as late as possible before enqueuing matches if the event is in fact already active in a frame. If this method were start filtering results while the frame is active, and the frame were to finish before all matching callbacks were determined, we would be perfectly happy to return all matches, and allow the outer submit_event context to run them right away in a newly constructed frame. The _very_ next thing that gets done is adding this event to the active event tracker. Otherwise, matching is performed as usual, and eligible callbacks are simply enqueued for the next event frame, which will be checked in the “done” callback of the active frame. The logic here should mostly “seal up” any real opportunities for error, e.g., a frame ending and popping off elements from running_events half-way through their inserting at the end of this method, or multiple threads checking for matching routes for the same event, and both coming away with a non-empty set of matches to run. That last example highlights precisely how the single event-frame model works: many threads might be running this method at the same time, for the same event (which has fired rapidly), but only one should be able to “secure the frame” and begin running the matching callbacks. Making the “active frame check” both as late as possible and as close to the event blocking stage in the tracker (in submit_event), we make the ambiguity gap as small as possible (and almost certainly smaller than any realistic I/O-bound event duplication).

Return type:

tuple[list[Callable], list[int | float]]

Note: on event actions

The debounce reset is now only set if the event is successfully filtered. This allows some middle ground when trying to depend on event actions: if the action passes through, we block the whole range of actions until the debounce window completes. Otherwise, the event remains open, only to be blocked by the debounce on the first matching action.

property primary_thread_pool

Handle tier-I futures.

queue_callbacks(event_idx, callbacks)[source]

Overridable by inheriting classes based on callback structure

register(endpoint, callback, pattern, debounce=200, delay=10, callback_timeout=None, condition=FrameDirective.CONTINUE_WITHOUT, **listener_kwargs)[source]

Register a route.

Note: Listener arguments

Notice how listener_kwargs are accumulated instead of uniquely assigned to an endpoint. This is generally acceptable as some listeners may allow various configurations for the same endpoint. Note, however, for something like the PathListener, this will have no effect. Registering the same endpoint multiple times won’t cause any errors, but the configuration options will only remain for the last registered group.

(Update) The above remark about PathListener’s is no longer, and likely never was. Varying flag sets under the same endpoint do in fact have a cumulative effect, and we need to be able disentangle events accordingly through submitted event’s action value.

Parameters:
  • endpoint

  • callback (Callable) – callable accepting an event to be executed if when a matching event is received

  • pattern – hashable object to be used when filtering event (passed to inherited filter(...))

  • debounce

  • delay

  • callback_timeout – timeout for waiting

property secondary_thread_pool

Handle tier-II futures.

shutdown()[source]
stop_event(event)[source]

Pop event out of the running events tracker and return it.

submit(events, callbacks=None)[source]

Handle a list of events. Each event is matched against the registered callbacks, and those callbacks are ran concurrently (be it via a thread pool or an asyncio loop).

submit_callback(callback, *args, **kwargs)[source]
submit_event(event, callbacks=None, timeouts=None, conditions=None)[source]

Group up and submit all matching callbacks for event. All callbacks are ran concurrently in their own threads, and this method blocks until all are completed.

In the outer submit context, this blocking method is itself ran in its own thread, and the registered post-callbacks are attached to the completion of this function, i.e., the finishing of all callbacks matching provided event.

Note that an event may not match any routes, in which case the method exits early. An empty list is returned, and this shows up as the outer future’s result. In this case, the event is never considered “running,” and the non-result picked up in clear_event will ensure it exits right away (not even attempting to pop the event from the running list, and for now not tracking it in the event log).

Return type:

list

submit_event_callback(callback, event, *args, **kwargs)[source]

Note: this method is expected to return a future. Perform any event-based filtering before submitting a callback with this method.

wait_on_event_callbacks(event, callbacks, timeouts=None, conditions=None)[source]

Waits for event-associated callbacks to complete.

Submits each callback in callbacks to the thread pool (via submit_callback), passing event as the only parameter to each. Once started, the future for callback callbacks[i] will have timeouts[i] seconds to complete. If it has not completed in this time, the future’s result is set to the Timeout exception

Overridable by inheriting classes based on callback structure

wrap_timed_callback(callback, submitted_time)[source]

Check for shutdown flag and exit before running the callbacks.

Applies primarily to jobs enqueued by the ThreadPoolExecutor but not started when an interrupt is received.

class execlib.router.RouterBuilder(register_map)[source]

Bases: ChainRouter

Builds a (Chain)Router using attached methods and passed options.

This class can be subtyped and desired router methods attached using the provided route decorator. This facilitates two separate grouping mechanisms:

  1. Group methods by frame (i.e., attach to the same router in a chain router)

  2. Group by registry equivalence (i.e, within a frame, registered with the same parameters)

These groups are indicated by the following collation syntax:

@route('<router>/<frame>', '<route-group>', **route_kwargs)
def method(...):
    ...

and the following is a specific example:

@route(router='convert', route_group='file', debounce=500)
def file_convert_1(self, event):
    ...

which will attach the method to the “convert” router (or “frame” in a chain router context) using parameters (endpoint, pattern, and other keyword args) associated with the “file” route group (as indexed by the register_map provided on instantiation) with the debounce route keyword (which will override the same keyword values if set in the route group). Note that the exact same @route signature can be used for an arbitrary number of methods to be handled in parallel by the associated Router.

Note that there is one reserved route group keyword: “post,” for post callbacks. Multiple post-callbacks for a particular router can be specified with the same ID syntax above.

Map structures

The following is a more intuitive breakdown of the maps involved, provided and computed on instantiation:

# provided
register_map[<router-name>] -> ( Router, { <type>: ( ( endpoint, pattern ), **kwargs ) } )

# computed
routers[<router-name>][<type>] -> [... <methods> ...]

TODO

Consider “flattening” the register_map to be indexed only by <type>, effectively forcing the 2nd grouping mechanism to be provided here (while the 1st is handled by the method registration within the body of the class). This properly separates the group mechanisms and is a bit more elegant, but reduces the flexibility a bit (possibly in a good way, though).

__init__(register_map)[source]
Parameters:
  • loop

  • workers – number of workers to assign the thread pool when the event loop is started. Defaults to None, which, when passed to ThreadPoolExecutor, will by default use 5x the number of available processors on the machine (which the docs claim is a reasonable assumption given threads are more commonly leveraged for I/O work rather than intense CPU operations). Given the intended context for this class, this assumption aligns appropriately.

route_registry = {}
execlib.router.route(router, route_group, **route_kwargs)[source]