Flow Manager

A FlowManager is used to associate Flows with Schedules, and run them accordingly.

class flow.flow_manager.FlowManager

A manager to run many flows on predetermined schedules.

__init__(num_processes: int) None

Initialize the manager with no flows.

Parameters:

num_processes (int) – The number of processes to use when running flows.

Flow will only be run in parallel when pathos is installed (see run())

add_unconf_flow(flow: Flow[Any], schedule: Schedule, config_path: str, logfiles: list[str] = [], silent: bool = True) None

Add an unconfigured flow to be run on a schedule.

Parameters:
  • flow (Flow[Any]) – The configured flow to run

  • schedule (Schedule) – The schedule to run the flow on

  • config_path (str) – The path to the flow’s configurations

  • logfiles (list[str]) – Paths to logfiles, to record the flow’s output to. Defaults to []

  • silent (bool) – Whether the flow should not print output to the terminal. Defaults to True

Raises:

Exception – Raised if the flow is configured, or if a flow with the same name is already present

Note that flows are configured/validated automatically when added, such that all flows are validated before any are run

add_conf_flow(flow: Flow[Any], schedule: Schedule) None

Add a configured flow to be run on a schedule.

Parameters:
  • flow (Flow[Any]) – The configured flow to run

  • schedule (Schedule) – The schedule to run the flow on

Raises:

Exception – Raised if the flow isn’t configured, or if a flow with the same name is already present

run() None

Run the flow manager.

This will check the schedules of all flows, and run them if appropriate (each on a separate process).

FlowManager uses pathos to run Flows in parallel, across multiple processes. This is necessary (using pathos instead of the built-in multiprocessing library) because flow instances (not to mention their dynamically-defined configuration types) cannot be pickled with pickle, and must instead by pickled with dill. If pathos is not present, Flows will be run serially