Worker Execution
Workers are instantiated and executed in a parent-child hierarchy with a WorkerSystem instance at the root of the hierarchy, and with an arbitrary number of descendant workers and levels (see the Worker System and Adding and Grouping Workers articles for more on worker creation).
Execution Rules
After the user starts the worker system with StartAsync() or Start(), it in turn runs any code that had been added (typically with one of the Root() callback overloads) to the worker system. That code can in turn create workers that will be automatically started, and they can in turn create child workers, etc.
The execution order therefore starts with the worker system, followed by the worker hierarchy from top to bottom, while obeying any added explicit constraints (maximum worker children running and start constraints between workers) as well as implicit dataflow link constraints.
E.g., in this worker system:
- Each of the five rectangles contained inside the
WorkerSystem
rectangle represent a distinct worker instance - Execution order and duration is:
WorkerSystem
, until all children have either completed or cannot be startedMoveFileWorker
. If successful:ActionWorker
, until it's three children have completedExecuteProcessWorker
,FileHelperFileSource
andAdbInsertTarget
in parallel (withAdbInsertTarget
delayed untilFileHelperFileSource
starts sending data)
The detailed rules for all worker systems are as follows:
- A worker system instance can only run once. To perform the same work again, a new worker system must be created (optionally with different workers and settings).
- A worker instance can only run once. To perform the same work again, a new worker must be added to the worker system (optionally with different settings).
- The library user is responsible for starting the worker system by calling StartAsync() or Start() on the worker system
- The system is responsible for starting workers. It will keep attempting to start the children for a particular parent, until either all its children have completed, or no more children can be started due to start constraints (see below).
Important
By default, workers under a parent are only started after the worker creation code
(e.g. added with WorkerSystem.Root()
) has fully completed. This behavior can
be modified with RunChildrenAsync().
In the documentation, the workers, constraints, and dataflow links are depicted as follows:
Start Constraints
Start constraints (a.k.a. dependencies) control whether a particular worker
can be started or not. If any one constraint blocks the worker from being started, the worker
remains in the Created
state.
For each parent worker (and similarly for the worker system), the start constraints of all unstarted children are (re)evaluated:
- Automatically when:
- The parent worker or worker system RunAsync() method has
completed (which includes the
WorkerSystem.Root()
callback) - The parent's RunChildrenAsync() is called (e.g. to implement looping)
- Any child worker changes
Status
- The first input port (if any) of any child worker goes beyond the
Ready
state - A start constraint property (IsStartable or MaxRunningChildren) on any child worker is set to a new value
- The parent worker or worker system RunAsync() method has
completed (which includes the
- Manually when RescheduleChildren() is called on the parent. This is only rarely needed - in virtually all cases the above automatic behavior is sufficient.
There are two start constraints available, covered next: IsStartable
and
MaxRunningChildren
.
WorkerBase.IsStartable
IsStartable is a callback that can block a worker from being started. Set this (via a constructor parameter or the property itself) to define a custom start constraint. After the worker has started, this property is ignored.
By default:
- Workers without input ports (i.e. non-dataflow workers and dataflow sources) are always startable
- Workers with input ports (i.e. transforms and targets) are blocked until at least one
input has gone beyond the
Ready
state (i.e. an upstream worker sends data or port completion, which sets Inputs.AnyActivated to true)
This default behavior is implemented in the DefaultIsStartable() method, which you can use if you want to extend the default behavior instead of completely replace it, e.g.:
worker.IsStartable = () => worker.DefaultIsStartable() && otherWorker.IsCompleted;
The IsStartable
callback (e.g. an anonymous function like above, or a method) should
return false
to block the worker, or true
to allow it to be started. Set it to
null
to get the default behavior.
Important
- To avoid dataflow deadlocks, all dataflow source workers that are linked to each other via ports (normally indirectly via transforms and targets) will be started as a group, but only when all of their (default or custom) start constraints are startable.
- If a dataflow worker starts, all other dataflow workers linked (directly and indirectly) via ports MUST be allowed to start at some point, otherwise some of them (and the whole worker system) can hang forever from input ports that never completes, or being unable to send rows downstream.
For further details on setting dataflow constraints, please see Dataflow Blocking and Row Buffering.
Note
- Other limits (i.e. MaxRunningChildren) can also stop a worker from being run.
- Automatic evaluation of start constraints only happens on changes in the parent and
the worker siblings, not on changes in any other workers. It is therefore best practice
to only reference the parent and siblings workers when setting
IsStartable
.
Example
// No constraints, will be started immediately
var step1 = new ActionWorker(ws, "Step 1", aw =>
{
// ...
};
// Only start "Step 2" if "Step 1" completed successfully
var step2 = new ActionWorker(ws, "Step 2", aw =>
{
// ...
}
, () => step1.IsSucceeded // This parameter sets the IsStartable property
);
// Only start the "Step 3" if either "Step 1" OR "Step 2" has
// completed (with any status)
var step3 = new ActionWorker(c, "Step 3", aw =>
{
// ...
}
, () => step1.IsCompleted || step2.IsCompleted
);
SucceededSequence()
Instead of explicitly setting IsSucceeded
constraints on multiple sequential workers,
you can call a SucceededSequence<TLastWorker>(TLastWorker)
overload to accomplish the same thing:
var lastWorkerInSequence = new ActionWorker(/* ... */).SucceededSequence(
new AdbTableNonQueryWorker(/* ... */)
, new ActionWorker(/* ... */)
);
WorkerParent.MaxRunningChildren
MaxRunningChildren sets the maximum number of children for a particular parent that can be running simultaneously. When this limit is reached, no new children will be started until the number of running children is lower than the value of this property.
Note however that to avoid scheduling deadlocks, workers with input ports, i.e. transforms and targets, will be started irrespective of this setting. Transforms and targets still count towards the number of running children, and can therefore contribute to blocking non-dataflow workers and sources from being started.
The default value is -1, i.e. unlimited.
This property is typically used to limit the aggregate resource consumption (number of database connections, CPU, memory, ...) of a parent. Note that it only limits the number of direct children started, not grandchildren etc.
Important
- Avoid setting the limit too low, especially with dataflow children, since that could lead to the worker system hanging.
- When setting
MaxRunningChildren
, it is often useful to group related workers under anActionWorker
so that they are all enabled at the same time.
Extension Callbacks
Users can extend any existing worker or worker system instance by adding callbacks (as opposed to instantiating additional workers or creating Custom Workers or custom worker systems).
The callbacks will be called at various points throughout the instance execution. Unlike instantiating additional workers, these callbacks are guaranteed to only run when the worker or worker system has actually been started. Callbacks can be useful on a worker or worker system instance for:
- Performing custom setup and cleanup
- Checking and potentially modifying the success or failure status of an execution step
- Sending notifications or counting invocations
- Executing code when all child workers start or when each child worker completes
Callbacks are added with the following methods, see these and Detailed Life-cycle below for further information:
- On worker systems:
- On workers and worker systems:
- On workers:
Status Property
The worker system and each worker has a WorkerParentStatus Status property that describes its current state:
Created
- Has been instantiated, e.g.new ActionWorker(...)
Running
- Has been startedSucceeded
- Successful completionCanceled
- Completed prematurely due to cancellationError
- Completed with one or more errorsFatal
- Completed with one or more fatal (i.e. unrecoverable) errors
Worker Error Handling has more details on unsuccessful outcomes.
Detailed Life-cycle
This section describes all the possible execution steps and callbacks of the worker system and its workers. It is normally not required reading, but can be useful to understand when performing low-level development.
Worker Life-cycle
All workers have a RunAsync() method that the system calls
(during the Running
phase of the worker), which typically contains all the worker
specific logic, including initialization and cleanup.
The Running
phase does however include additional places where logic can optionally be
inserted, both at compile time and at runtime, to e.g. customize the initialization, cleanup,
and error handling of existing workers.
The following is a detailed description of the worker life-cycle:
Created
1: The library user code creates the worker, e.g.
new Worker(parent, "No-op");
Running
The worker is started by the system, which will:
2: Change any output and error output ports' State
from Created
to Ready
3: Call and await any callbacks that have been added
by AddStartingCallback(Func<WorkerBase, Task<ProgressStatus>>).
If any callback returned a failure status, skip to step 8. Otherwise, if any callback returned
a Succeeded
status, empty any input ports, successfully complete any
output ports, and skip to step 8.
4: Call and await the RunAsync() method, which is normally where the bulk of the worker logic resides
5: Call and await any callbacks that have been added
with AddRanCallback(Func<WorkerBase, OutcomeStatus, WorkerParentChildrenState, Task<OutcomeStatus>>),
passing in the OutcomeStatus
the previous step returned (normally Succeeded
) and
WorkerParentChildrenState.
6: Fail the worker if there are any active input or output ports, and empty the input ports. Successfully complete any error output ports.
7: If no errors above, call and await RunChildrenAsync() to run any worker children unless the library user has already called it
- You can call
RunChildrenAsync()
in RunAsync() or viaAddRanCallback()
, but then you must also await its completion. RunChildrenAsync()
also calls and incorporates the result of any callbacks added with AddStartingChildrenCallback(Func<WorkerParent, Task<ProgressStatus>>).
8: Call and await any callbacks that have been added
with
AddCompletedCallback(Func<WorkerBase, OutcomeStatus, Task<OutcomeStatus>>),
passing in the last OutcomeStatus
of the previous steps (normally Succeeded
)
9: Set the worker final Status
to the last status from the previous steps,
by default Succeeded
Note
Callbacks in step 5 and 8 takes the current status as input, and can
change it (unless it's Fatal
), e.g. from Error
to Succeeded
when
handling a known error.
Succeeded
, Canceled
, Error
, or Fatal
The system performs additional steps after the worker is completed:
10: Call any AddChildCompletedCallback(Action<WorkerBase>) callbacks added to the worker parent
11: Remove any worker children, unless disabled with KeepChildrenLevels
12: Dispose any disposables added with DisposeOnFinished<TDisposable>(TDisposable)
WorkerSystem Life-cycle
The following is a detailed description of the WorkerSystem
life-cycle,
which has similar but fewer steps than a worker:
Created
1: The library user creates the worker system by using new
, and adds
logic for it to run, e.g.
var workerSystem = new WorkerSystem("Refresh Staging")
.Root(ws => { /* ... */ });
Running
2: The library user starts the worker system by calling StartAsync() or Start()
The system will:
3: Call and await any callbacks that have been added with
AddStartingCallback(Func<TDerived, Task<ProgressStatus>>).
If any callback returned Succeeded
or a failure status, skip to step 6
4: Call and await the callback added with Root()
, if any
5: If no errors above, call and await
RunChildrenAsync()
to run any worker children unless the library user has already called it
in the Root()
callback
RunChildrenAsync()
also calls and incorporates the result of any callbacks added with AddStartingChildrenCallback(Func<WorkerParent, Task<ProgressStatus>>).
6: Call and await any callbacks that have been added
with AddCompletedCallback(Func<TDerived, OutcomeStatus, Task<OutcomeStatus>>),
passing in the last OutcomeStatus
of the previous steps (normally Succeeded
)
7: Set the worker system final Status
to the last status from the previous steps,
by default Succeeded
Note
Callbacks in step 6 takes the current status as input, and can
change it (unless it's Fatal
), e.g. from Error
to Succeeded
when
handling a known error.
Succeeded
, Canceled
, Error
, or Fatal
The system performs an additional step after setting the completion status:
8: Dispose any disposables added with DisposeOnFinished<TDisposable>(TDisposable)