co3.syncer module

class co3.syncer.Syncer(differ)[source]

Bases: Generic

__init__(differ)[source]
chunked_sync(l_select_kwargs, r_select_kwargs, chunk_time=None, item_limit=None, chunk_cap=None)[source]

Sync diff sets through attached handlers in chunks.

Chunks sizes are determined by a desired processing duration, i.e., how long should be spent aggregating items in handlers (handle_* methods) and the subsequent call to process_chunk. This is particularly useful for database driven interactions, where one needs wants to embrace bulk insertion (saving on repetitive, costly I/O-bound tasks) while performing intermittent consistency checks (“saving progress” along the way by inserting in batches; one could imagine otherwise performing a huge amount of computation only to encounter an error interacting with the database and subsequently rollback the transaction, losing all results).

Chunk times are provided (rather than directly specifying sizes) due to the variable nature of handlers and the items they need to process. For example, if handlers prepare events for bulk submission to an execlog.Router, it’s difficult to estimate how long the resulting execution traces will take to complete. A regular file may take 0.01s to finish preparing, whereas an audio file may kick off a 5 minute transcription job. Targeting the aggregate chunk processing time allows us to dynamically readjust batch sizes as more jobs complete.

A few extra technical remarks:

  • We randomly shuffle the input items to even out any localized bias present in the order of items to be processed. This helps facilitate a more stable estimate of the average chunk duration and promote ergodicity (“typical” batch more representative of the “average” batch).

  • We employ an exponential moving average over times of completed chunks, more heavily weighting the durations of recently completed chunks when readjusting sizes. This is not because recent jobs reveal more about the average job length (we randomize these as mentioned above), but instead to reflect short-term changes in stationarity due to variable factors like resource allocation. For example, we’d like to quickly reflect the state of the CPU’s capacity after the start/end of external processing tasks so we might more accurately estimate batch sizes.

Note

Could be dangerous if going from fast file processing to note processing. Imagine estimating 1000 iter/sec, then transferring that to the next batch when it’s more like 0.2 iter/sec. We would lose any chunking. (Luckily, in practice, turns out that notes are almost always processed before the huge set of nested files lurking and we don’t experience this issue.)

Sync strategy

  1. Compute diffs under the provided Differ between supported selection sets for its SelectableResources.

  2. Perform any additional filtering of the diff sets with filter_diff_sets, producing the final set triplet.

  3. Begin a chunked processing loop for all items involved in the final diff sets. Items exclusive to the left resource are passed to handle_l_excl, handle_r_excl for those exclusive to the right resource, and handle_lr_int for the left-right intersection. Note how this mechanism should inform the implementation of these methods for inheriting subclasses: items are only handled once they are part of an actively processed chunk. We don’t first globally process the sets and “clean up” chunk-by-chunk, as this is less stable and can waste compute if we need to exit early.

Parameters:
  • chunk_time (int | None) – desired processing time per batch, in seconds

  • item_limit (int | None) – number of items to sync before exiting

  • chunk_cap (int | None) – upper limit on the number of items per chunk. Even if never reached, setting this can help prevent anomalous, unbounded time estimates that yield prohibitively sized chunks.

Return type:

list

filter_diff_sets(l_excl, r_excl, lr_int)[source]
handle_l_excl(key, val)[source]
handle_lr_int(key, val)[source]
handle_r_excl(key, val)[source]
process_chunk(handler_results)[source]
shutdown()[source]
sync(l_select_kwargs, r_select_kwargs)[source]
Return type:

list