Class WorkerBase
The main unit of execution. Specify this class when referring to workers without knowing the final derived worker type. All workers inherit from this class, indirectly via WorkerBase<TDerived>. Use this latter class when you do know the final derived worker type.
When starting a worker, the library calls RunAsync() during the worker Running phase. A derived class must override this abstract method, which normally contains the bulk of the worker logic.
Note that the worker Running phase also includes additional places where logic can
optionally be inserted via callbacks, to e.g. customize the initialization, cleanup,
and error handling of existing workers. This is mostly used when customizing workers that
are not designed to be derived from (i.e. without a "Base" suffix).
See
Worker Life-cycle for details.
See Workers for how to use workers, and see Custom Workers for how to develop your own workers.
Implements
Inherited Members
Namespace: actionETL
Assembly: actionETL.dll
Syntax
public abstract class WorkerBase : WorkerParent, IDisposeOnFinished
Properties
ErroredPortErrorsWorkerProtected
Get or sets whether an Error port should automatically fault the worker (the default), or not.
If a setting to control this is presented to the library user, it is recommended this
is done as a public bool property on the worker with the name ErroredPortErrorsWorker.
Set to true (the default) to automatically fault the worker if any port is errored.
Set to false to not automatically fault the worker if a port is errored. This can
e.g. be useful for a worker with multiple inputs where it is acceptable that some (but not all)
input ports fail.
Note that a Fatal port will always fault the worker, irrespective of this setting.
Note: This property is thread-safe.
Declaration
protected bool ErroredPortErrorsWorkerProtected { get; set; }
Property Value
| Type | Description |
|---|---|
| Boolean |
ErrorOutputs
Gets the error output ports. Note that these are 'untyped', and do not specify the type of the error output rows. They are however useful for performing all operations that do not require the row type of the port.
Note that workers with ports typically add additional members that allow access to typed versions of the ports.
Note: This property is thread-safe.
Declaration
public ErrorOutputPortCollection ErrorOutputs { get; }
Property Value
| Type | Description |
|---|---|
| ErrorOutputPortCollection | The error output ports. |
EscalateError
Gets or sets a value indicating whether to escalate an Error worker completion to
the parent worker, defaults to true.
Note that a Fatal status will always be escalated, irrespective
of this setting.
Cannot be set after the worker has started running.
Suppressing the escalation allows handling or taking an alternative action on error, which usually involves setting IsStartable constraints.
Suppressing the escalation can also be used to implement retry-on-error functionality, typically
by using an iterating worker such as WhileActionWorker<T> or
ForEachActionWorker<TItem>, and setting EscalateError to false on
some of their child workers.
Note: This property is thread-safe.
Declaration
public bool EscalateError { get; set; }
Property Value
| Type | Description |
|---|---|
| Boolean |
|
Exceptions
| Type | Condition |
|---|---|
| InvalidOperationException | Cannot set the value after the worker has started running. |
Inputs
Gets the input ports. Note that these are 'untyped', and do not specify the type of the input rows. They are however useful for performing all operations that do not require the row type of the port.
Note that workers with ports typically add additional members that allow access to typed versions of the ports.
Note: This property is thread-safe.
Declaration
public InputPortCollection Inputs { get; }
Property Value
| Type | Description |
|---|---|
| InputPortCollection | The input ports. |
IsStartable
A callback that limits when a worker can be started. Set this (usually via a constructor parameter) to define a custom start constraint (also known as a dependency). After the worker has started, this property is unused, and can then be set.
The default value is DefaultIsStartable(): workers without input ports are
always startable, and workers with input ports (i.e. transforms and targets) are blocked
until at least one input has gone beyond the Ready state (i.e. until
AnyActivated becomes true).
Note however that to avoid dataflow deadlocks, all dataflow source workers that are linked to each other via ports (normally indirectly via transforms and targets) will be started as a group, but only when all of their (default or custom) start constraints are startable.
Also note that MaxRunningChildren can stop a worker from being run.
For further details, see Start Constraints.
Note: This property is thread-safe.
Declaration
public Func<bool> IsStartable { get; set; }
Property Value
| Type | Description |
|---|---|
| Func<Boolean> | A callback (often an anonymous function) that returns Set to |
Exceptions
| Type | Condition |
|---|---|
| InvalidOperationException | Cannot set the value after the worker has started running. |
Outputs
Gets the output ports. Note that these are 'untyped', and do not specify the type of the output rows. They are however useful for performing all operations that do not require the row type of the port.
Note that workers with ports typically add additional members that allow access to typed versions of the ports.
Note: This property is thread-safe.
Declaration
public OutputPortCollection Outputs { get; }
Property Value
| Type | Description |
|---|---|
| OutputPortCollection | The output ports. |
Parent
Reference to the parent of this worker.
Note: This property is thread-safe.
Declaration
public WorkerParent Parent { get; }
Property Value
| Type | Description |
|---|---|
| WorkerParent |
Methods
AddCompletedCallback(Func<WorkerBase, OutcomeStatus, Task<OutcomeStatus>>)
Adds a callback which will be called as the last step of the worker
Running phase, which is after any children and ports have completed.
Multiple callbacks can be added.
These callbacks are useful for performing cleanup tasks. Also see DisposeOnFinished<TDisposable>(TDisposable) and UsingActionWorker<TDisposable> which are specifically for disposing resources.
If the worker is started, these callbacks will run, even if earlier callbacks, RunAsync(), or any children have failures. If multiple callbacks have been added, they will all be called (even if one fails) in the reverse order from they were added, e.g. callbacks from derived classes will be called before callbacks from base classes.
Each callback takes the worker itself (typed as WorkerBase) and the
OutcomeStatus from previous steps (including any children) as parameters.
Note: Adding the callback is thread-safe.
Declaration
public WorkerBase AddCompletedCallback(Func<WorkerBase, OutcomeStatus, Task<OutcomeStatus>> completedFuncAsync)
Parameters
| Type | Name | Description |
|---|---|---|
| Func<WorkerBase, OutcomeStatus, Task<OutcomeStatus>> | completedFuncAsync | The asynchronous callback that will be called when the worker has completed. |
Returns
| Type | Description |
|---|---|
| WorkerBase | The instance itself, so you can chain multiple calls. |
Remarks
The callback takes two parameters:
WorkerBase | The worker itself |
OutcomeStatus | From previous execution steps (including any children) |
The callback implementation is either synchronous and returns a Task<OutcomeStatus>
explicitly, or is async and returns an OutcomeStatus.
- To leave the
OutcomeStatusunchanged, return theOutcomeStatusthat was passed in as a parameter (and use ToTask() in synchronous callbacks). - To change the
OutcomeStatus(e.g. changingErrortoSucceeded), return a newOutcomeStatusinstance. If appropriate, use Combine(Boolean, OutcomeStatus) to also retain the previous information, both of which the system will log.
If you need to access members of the derived worker type, there are three options:
- Use AddCompletedCallback(Func<TDerived, OutcomeStatus, Task<OutcomeStatus>>) instead if you know the derived worker type.
- If you have access to the source of the worker, implement the callback as a (non-static) method in that worker. This allows accessing worker members without casting the worker parameter to the derived worker type.
- If you don't have access to the source of the worker, or you want to implement the callback using a lambda, cast the worker parameter to the actual derived type of the worker.
Examples
In this example we roll back a transaction on failure:
worker.AddCompletedCallback((w, outcomeStatus) =>
{
if (!outcomeStatus.IsSucceeded)
transaction?.Rollback();
return outcomeStatus.ToTask();
});
Exceptions
| Type | Condition |
|---|---|
| ArgumentNullException |
|
| InvalidOperationException | Cannot add 'Starting' callback after the worker has completed. |
AddRanCallback(Func<WorkerBase, OutcomeStatus, WorkerParentChildrenState, Task<OutcomeStatus>>)
Adds a callback which will be called immediately after RunAsync() finishes, e.g. for performing cleanup.
Note that after these callbacks return, ports will be checked for any error status
or not being completed, and child workers will be run (if not already run
explicitly, or the worker RunAsync() failed), both of which can
fail the current worker.
If multiple callbacks have been added, they will all be called (even if one fails) in the reverse order from they were added, e.g. callbacks from derived classes will be called before callbacks from base classes.
If RunAsync() does not get called (due to an
AddStartingCallback() callback), AddRanCallback() callbacks will not run.
Each callback takes the worker itself, the OutcomeStatus from RunAsync(),
and WorkerParentChildrenState as parameters.
Note: Adding the callback is thread-safe.
Declaration
public WorkerBase AddRanCallback(Func<WorkerBase, OutcomeStatus, WorkerParentChildrenState, Task<OutcomeStatus>> ranFuncAsync)
Parameters
| Type | Name | Description |
|---|---|---|
| Func<WorkerBase, OutcomeStatus, WorkerParentChildrenState, Task<OutcomeStatus>> | ranFuncAsync | The asynchronous callback that will be called immediately after RunAsync() finishes. See "Remarks" below for details. |
Returns
| Type | Description |
|---|---|
| WorkerBase | The instance itself, so you can chain multiple calls. |
Remarks
The callback takes three parameters:
WorkerBase | The worker itself |
OutcomeStatus | The RunAsync return value |
| WorkerParentChildrenState | Describes whether children have already been added or completed. If completed, you must RemoveChildren() before adding any new child workers. You can also call a RunChildrenAsync() overload, but must then also await its completion. |
The callback implementation is either synchronous and returns a
Task<OutcomeStatus> explicitly, or is async and returns an
OutcomeStatus.
- To leave the
OutcomeStatusunchanged, return theOutcomeStatusthat was passed in as a parameter (and use ToTask() in synchronous callbacks). - To change the
OutcomeStatus(e.g. changingErrortoSucceeded), return a newOutcomeStatusinstance. If appropriate, use Combine(Boolean, OutcomeStatus) to also retain the previous information, both of which the system will log.
If you need to access members of the derived worker type, there are three options:
- Use AddRanCallback(Func<TDerived, OutcomeStatus, WorkerParentChildrenState, Task<OutcomeStatus>>) instead if you know the derived worker type.
- If you have access to the source of the worker, implement the callback as a (non-static) method in that worker. This allows accessing worker members without casting the worker parameter to the derived worker type.
- If you don't have access to the source of the worker, or you want to implement the callback using a lambda, cast the worker parameter to the actual derived type of the worker.
Examples
In this example we roll back a transaction on failure:
worker.AddRanCallback((w, outcomeStatus, childrenStatus) =>
{
if (!outcomeStatus.IsSucceeded)
transaction?.Rollback();
return outcomeStatus.ToTask();
});
Exceptions
| Type | Condition |
|---|---|
| ArgumentNullException |
|
| InvalidOperationException | Cannot add 'Ran' callback after the worker has completed. |
AddStartingCallback(Func<WorkerBase, Task<ProgressStatus>>)
Adds a callback which will be called as the first step of the worker Running phase,
which is before RunAsync() is called.
Multiple callbacks can be added.
Also see AddStartingCallback(Func<TDerived, Task<ProgressStatus>>).
These callbacks are often used for initialization, or for adding child workers, e.g. when not having access to the worker implementation.
If the worker is started, these callbacks will run, even if some fail, in the order they were added, e.g. callbacks added in base classes will be called before callbacks added in derived classes.
The callback takes the worker itself as a parameter, typed as WorkerBase.
Note: Adding the callback is thread-safe.
Declaration
public WorkerBase AddStartingCallback(Func<WorkerBase, Task<ProgressStatus>> startingFuncAsync)
Parameters
| Type | Name | Description |
|---|---|---|
| Func<WorkerBase, Task<ProgressStatus>> | startingFuncAsync | The asynchronous callback that will be called when the worker has started. See "Remarks" below for details. |
Returns
| Type | Description |
|---|---|
| WorkerBase | The instance itself, so you can chain multiple calls. |
Remarks
The callback takes the worker itself as a parameter, typed as WorkerBase.
If you need to access members of the derived worker type, there are three options:
- Use AddStartingCallback(Func<TDerived, Task<ProgressStatus>>) instead if you know the derived worker type.
- If you have access to the source of the worker, implement the callback as a (non-static) method in that worker. This allows accessing worker members without casting the worker parameter to the derived worker type.
- If you don't have access to the source of the worker, or you want to implement the callback using a lambda, cast the worker parameter to the actual derived type of the worker.
The callback implementation is either synchronous and returns a
Task<ProgressStatus> explicitly, or is async and returns a
ProgressStatus.
The callback should return:
ProgressStatus.NotCompletedTask(orProgressStatus.NotCompleted) to continue normal processing.ProgressStatus.SucceededTask(orProgressStatus.Succeeded) to silently empty any input ports and transmit SendSucceeded() to any output ports. RunAsync(),AddRanCallback()andAddStartingChildrenCallback()callbacks will not be called, and child workers will not be started. All otherAddStartingCallback(),AddCompletedCallbackand parent AddChildCompletedCallback(Action<WorkerBase>) callbacks will be called.- A failure
ProgressStatuson failure, which will give the worker that failure status. NeitherRunAsync()norAddRanCallback()callbacks will be called, and child workers will not be started. All otherAddStartingCallback(),AddCompletedCallback()and parentAddChildCompletedCallback()callbacks will however be called.
Examples
In this example we allocate a row buffer:
worker.AddStartingCallback(w =>
{
_outputRows = new TOutput[Output.RowsPerBuffer];
return ProgressStatus.NotCompletedTask;
});
Exceptions
| Type | Condition |
|---|---|
| ArgumentNullException |
|
| InvalidOperationException | Cannot add 'Starting' callback after the worker has started running. |
DefaultIsStartable()
The default callback used by IsStartable, unless overridden by the library user.
With this default callback, workers without input ports are always startable,
and workers with input ports (i.e. transforms and targets) are blocked until at least
one input has gone beyond the Ready state (i.e. until AnyActivated
becomes true). This default callback can be used to incorporate the default start constraint with
your custom start constraint, e.g.:
worker.IsStartable = () => worker.DefaultIsStartable() && MyCustomStartableCondition;
Note however that to avoid dataflow deadlocks, all dataflow source workers that are linked to each other via ports (normally indirectly via transforms and targets) will be started as a group, but only when all of their (default or custom) start constraints are startable.
Also note that MaxRunningChildren can stop a worker from being run.
Note: This method is thread-safe.
Declaration
public bool DefaultIsStartable()
Returns
| Type | Description |
|---|---|
| Boolean |
|
SucceededSequence<TLastWorker>(TLastWorker)
Set the IsStartable function to run the workers in sequence, i.e. run a worker
if the preceding worker succeeds. This avoids having to explicitly set the IsStartable function
on each worker.
Declaration
public TLastWorker SucceededSequence<TLastWorker>(TLastWorker firstWorker)
where TLastWorker : WorkerBase
Parameters
| Type | Name | Description |
|---|---|---|
| TLastWorker | firstWorker | First worker to add. |
Returns
| Type | Description |
|---|---|
| TLastWorker |
Type Parameters
| Name | Description |
|---|---|
| TLastWorker |
Exceptions
| Type | Condition |
|---|---|
| ArgumentNullException | Worker number. |
| ArgumentException | Can only use SucceededSequence<TLastWorker>(TLastWorker) with worker siblings, not with worker descendants or ancestors. |
SucceededSequence<TLastWorker>(WorkerBase, TLastWorker)
Set the IsStartable function to run the workers in sequence, i.e. run a worker
if the preceding worker succeeds. This avoids having to explicitly set the IsStartable function
on each worker.
Declaration
public TLastWorker SucceededSequence<TLastWorker>(WorkerBase firstWorker, TLastWorker secondWorker)
where TLastWorker : WorkerBase
Parameters
| Type | Name | Description |
|---|---|---|
| WorkerBase | firstWorker | First worker to add. |
| TLastWorker | secondWorker | Second worker to add. |
Returns
| Type | Description |
|---|---|
| TLastWorker |
Type Parameters
| Name | Description |
|---|---|
| TLastWorker |
Exceptions
| Type | Condition |
|---|---|
| ArgumentNullException | Worker number. |
| ArgumentException | Can only use SucceededSequence<TLastWorker>(TLastWorker) with worker siblings, not with worker descendants or ancestors. |
SucceededSequence<TLastWorker>(WorkerBase, WorkerBase, TLastWorker)
Set the IsStartable function to run the workers in sequence, i.e. run a worker
if the preceding worker succeeds. This avoids having to explicitly set the IsStartable function
on each worker.
Declaration
public TLastWorker SucceededSequence<TLastWorker>(WorkerBase firstWorker, WorkerBase secondWorker, TLastWorker thirdWorker)
where TLastWorker : WorkerBase
Parameters
| Type | Name | Description |
|---|---|---|
| WorkerBase | firstWorker | First worker to add. |
| WorkerBase | secondWorker | Second worker to add. |
| TLastWorker | thirdWorker | Third worker to add. |
Returns
| Type | Description |
|---|---|
| TLastWorker |
Type Parameters
| Name | Description |
|---|---|
| TLastWorker |
Exceptions
| Type | Condition |
|---|---|
| ArgumentNullException | Worker number. |
| ArgumentException | Can only use SucceededSequence<TLastWorker>(TLastWorker) with worker siblings, not with worker descendants or ancestors. |
SucceededSequence<TLastWorker>(WorkerBase, WorkerBase, WorkerBase, TLastWorker)
Set the IsStartable function to run the workers in sequence, i.e. run a worker
if the preceding worker succeeds. This avoids having to explicitly set the IsStartable function
on each worker.
Declaration
public TLastWorker SucceededSequence<TLastWorker>(WorkerBase firstWorker, WorkerBase secondWorker, WorkerBase thirdWorker, TLastWorker fourthWorker)
where TLastWorker : WorkerBase
Parameters
| Type | Name | Description |
|---|---|---|
| WorkerBase | firstWorker | First worker to add. |
| WorkerBase | secondWorker | Second worker to add. |
| WorkerBase | thirdWorker | Third worker to add. |
| TLastWorker | fourthWorker | Fourth worker to add. |
Returns
| Type | Description |
|---|---|
| TLastWorker |
Type Parameters
| Name | Description |
|---|---|
| TLastWorker |
Exceptions
| Type | Condition |
|---|---|
| ArgumentNullException | Worker number. |
| ArgumentException | Can only use SucceededSequence<TLastWorker>(TLastWorker) with worker siblings, not with worker descendants or ancestors. |
SucceededSequence<TLastWorker>(WorkerBase, WorkerBase, WorkerBase, WorkerBase, TLastWorker)
Set the IsStartable function to run the workers in sequence, i.e. run a worker
if the preceding worker succeeds. This avoids having to explicitly set the IsStartable function
on each worker.
Declaration
public TLastWorker SucceededSequence<TLastWorker>(WorkerBase firstWorker, WorkerBase secondWorker, WorkerBase thirdWorker, WorkerBase fourthWorker, TLastWorker fifthWorker)
where TLastWorker : WorkerBase
Parameters
| Type | Name | Description |
|---|---|---|
| WorkerBase | firstWorker | First worker to add. |
| WorkerBase | secondWorker | Second worker to add. |
| WorkerBase | thirdWorker | Third worker to add. |
| WorkerBase | fourthWorker | Fourth worker to add. |
| TLastWorker | fifthWorker | Fifth worker to add. |
Returns
| Type | Description |
|---|---|
| TLastWorker |
Type Parameters
| Name | Description |
|---|---|
| TLastWorker |
Exceptions
| Type | Condition |
|---|---|
| ArgumentNullException | Worker number. |
| ArgumentException | Can only use SucceededSequence<TLastWorker>(TLastWorker) with worker siblings, not with worker descendants or ancestors. |
SucceededSequence<TLastWorker>(WorkerBase, WorkerBase, WorkerBase, WorkerBase, WorkerBase, TLastWorker)
Set the IsStartable function to run the workers in sequence, i.e. run a worker
if the preceding worker succeeds. This avoids having to explicitly set the IsStartable function
on each worker.
Declaration
public TLastWorker SucceededSequence<TLastWorker>(WorkerBase firstWorker, WorkerBase secondWorker, WorkerBase thirdWorker, WorkerBase fourthWorker, WorkerBase fifthWorker, TLastWorker sixthWorker)
where TLastWorker : WorkerBase
Parameters
| Type | Name | Description |
|---|---|---|
| WorkerBase | firstWorker | First worker to add. |
| WorkerBase | secondWorker | Second worker to add. |
| WorkerBase | thirdWorker | Third worker to add. |
| WorkerBase | fourthWorker | Fourth worker to add. |
| WorkerBase | fifthWorker | Fifth worker to add. |
| TLastWorker | sixthWorker | Sixth worker to add. |
Returns
| Type | Description |
|---|---|
| TLastWorker |
Type Parameters
| Name | Description |
|---|---|
| TLastWorker |
Exceptions
| Type | Condition |
|---|---|
| ArgumentNullException | Worker number. |
| ArgumentException | Can only use SucceededSequence<TLastWorker>(TLastWorker) with worker siblings, not with worker descendants or ancestors. |