Class WorkerSystemBase
An abstract class that creates and runs a system of workers. Use it to refer to worker systems when you don't know the exact derived type (e.g. WorkerSystem. Commonly used members include StartAsync(), Start(), and Config.
This class cannot be inherited directly. To create a custom worker system, derive from WorkerSystemBase<TDerived> and override RunAsync() to provide a custom implementation.
Implements
Inherited Members
Namespace: actionETL
Assembly: actionETL.dll
Syntax
public abstract class WorkerSystemBase : WorkerParent, IDisposeOnFinished
Properties
Config
Gets the configuration service. If a configuration service is not injected when creating the worker system, one will be automatically created, and populated from a default configuration location, if available.
Note: This property is thread-safe.
Declaration
public AConfig Config { get; }
Property Value
Type | Description |
---|---|
AConfig | The configuration service. |
CreationGuid
Gets an identifier that is unique to each created worker system. It is logged to the logging system. It can e.g. be added to user data rows, inserted into databases etc., to simplify linking different sets of user data to each other, and to link user data to logging information.
Note that the logging system uses the default text format: CreationGuid.ToString()
;
an example of a return value is "382c74c3-721d-4f34-80e5-57657b6cbc27".
Note: This property is thread-safe.
Declaration
public Guid CreationGuid { get; }
Property Value
Type | Description |
---|---|
Guid |
IsCancelRequested
Gets a value indicating whether canceling has been requested for the worker system. Long running workers can poll this property periodically, and halt processing on cancel.
Note: This property is thread-safe.
Declaration
public bool IsCancelRequested { get; }
Property Value
Type | Description |
---|---|
Boolean |
|
LicenseMaxWorkerTypes
Gets the maximum number of distinct worker types allowed for this worker system.
null
if unlimited.
Declaration
public int? LicenseMaxWorkerTypes { get; }
Property Value
Type | Description |
---|---|
Nullable<Int32> |
Methods
AddCompletedCallback(Func<WorkerSystemBase, OutcomeStatus, Task<OutcomeStatus>>)
Adds a callback which will be called as the last step of the worker system
Running
phase, which is after any children have completed.
Multiple callbacks can be added.
These callbacks are useful for performing cleanup tasks. Also see DisposeOnFinished<TDisposable>(TDisposable) and UsingActionWorker<TDisposable> which are specifically for disposing resources.
If the worker system is started, these callbacks will run, even if earlier callbacks, or any children have failures. If multiple callbacks have been added, they will all be called (even if one fails) in the reverse order from they were added, e.g. callbacks from derived classes will be called before callbacks from base classes.
Each callback takes the worker system itself (typed as WorkerSystemBase
) and the
OutcomeStatus
from previous steps (including any children) as parameters.
Note: Adding the callback is thread-safe.
Declaration
public WorkerSystemBase AddCompletedCallback(Func<WorkerSystemBase, OutcomeStatus, Task<OutcomeStatus>> completedFuncAsync)
Parameters
Type | Name | Description |
---|---|---|
Func<WorkerSystemBase, OutcomeStatus, Task<OutcomeStatus>> | completedFuncAsync | The asynchronous callback that will be called when the worker system has completed. See "Remarks" below for details. |
Returns
Type | Description |
---|---|
WorkerSystemBase | The instance itself, so you can chain multiple calls. |
Remarks
The callback takes two parameters:
WorkerSystemBase | The worker system itself |
OutcomeStatus | From previous execution steps (including any children) |
The callback implementation is either synchronous and returns a Task<OutcomeStatus>
explicitly, or is async
and returns an OutcomeStatus
.
- To leave the
OutcomeStatus
unchanged, return theOutcomeStatus
that was passed in as a parameter (and use ToTask() in synchronous callbacks). - To change the
OutcomeStatus
(e.g. changingError
toSucceeded
), return a newOutcomeStatus
instance. If appropriate, use Combine(Boolean, OutcomeStatus) to also retain the previous information, both of which the system will log.
If you need to access members of the derived worker system type, there are three options:
- Use AddCompletedCallback(Func<TDerived, OutcomeStatus, Task<OutcomeStatus>>) instead if you know the derived worker system type.
- If you have access to the source of the worker system type, implement the callback as a (non-static) method in that type. This allows accessing members without casting the worker system parameter to the derived worker system type.
- If you don't have access to the source of the worker system type, or you want to implement the callback using a lambda, cast the worker system parameter to the actual derived type of the worker system.
Examples
In this example we roll back a transaction on failure:
workerSystem.AddCompletedCallback((ws, outcomeStatus) =>
{
if (!outcomeStatus.IsSucceeded)
transaction?.Rollback();
return outcomeStatus.ToTask();
});
Exceptions
Type | Condition |
---|---|
ArgumentNullException |
|
InvalidOperationException | Cannot add 'Starting' callback after the worker system has completed. |
AddStartingCallback(Func<WorkerSystemBase, Task<ProgressStatus>>)
Adds a callback which will be called as the first step of the worker system Running
phase,
which is before Root(Action<WorkerSystem>) overloads and
RunAsync() is called or worker children are started.
Multiple callbacks can be added.
These callbacks are often used for initialization, or for adding child workers, e.g. from outside the worker system.
If the worker system is started, these callbacks will all run, even if some fail, in the order they were added, e.g. callbacks added in base classes will be called before callbacks added in derived classes.
The callback takes the worker system itself as a parameter, typed as
WorkerSystemBase
.
Note: Adding the callback is thread-safe.
Declaration
public WorkerSystemBase AddStartingCallback(Func<WorkerSystemBase, Task<ProgressStatus>> startingFuncAsync)
Parameters
Type | Name | Description |
---|---|---|
Func<WorkerSystemBase, Task<ProgressStatus>> | startingFuncAsync | The asynchronous callback that will be called when the worker system has started. See "Remarks" below for details. |
Returns
Type | Description |
---|---|
WorkerSystemBase | The instance itself, so you can chain multiple calls. |
Remarks
The callback takes the worker system itself as a parameter, typed as
WorkerSystemBase
.
If you need to access members of a derived worker system type, there are three options:
- Use AddStartingCallback(Func<TDerived, Task<ProgressStatus>>) instead if you know the derived worker system type.
- If you have access to the source of the worker system type, implement the callback as a (non-static) method in that worker. This allows accessing worker system members without casting the worker parameter to the derived worker system type.
- If you don't have access to the source of the worker system type, or you want to implement the callback using a lambda, cast the worker system parameter to the actual derived type of the worker system.
The callback implementation is either synchronous and returns a
Task<ProgressStatus>
explicitly, or is async
and returns a
ProgressStatus
.
The callback should return:
ProgressStatus.NotCompletedTask
(orProgressStatus.NotCompleted
) to continue normal processing.ProgressStatus.SucceededTask
(orProgressStatus.Succeeded
) to skip calling theRoot()
and AddStartingChildrenCallback(Func<WorkerParent, Task<ProgressStatus>>) callbacks, and child workers will not be started. All otherAddStartingCallback()
, AddCompletedCallback(Func<WorkerSystemBase, OutcomeStatus, Task<OutcomeStatus>>), and parent AddChildCompletedCallback(Action<WorkerBase>) callbacks will be called.- A failure
ProgressStatus
on failure, which will give the worker system that failure status. TheRoot()
andAddStartingChildrenCallback()
callbacks won't be called, and child workers will not be started. All otherAddStartingCallback()
,AddCompletedCallback()
, and parentAddChildCompletedCallbacks()
callbacks will be called.
Examples
In this example we allocate a buffer:
workerSystem.AddStartingCallback(ws =>
{
_buffer = new int[BufferSize];
return ProgressStatus.NotCompletedTask;
});
Exceptions
Type | Condition |
---|---|
ArgumentNullException |
|
InvalidOperationException | Cannot add 'Starting' callback after the worker system has started running. |
Start()
Starts the worker system synchronously. This will run any user added callbacks, the RunAsync() method, and any workers they add. With the out-of-box WorkerSystem, this includes any Root(Action<WorkerSystem>) overload callback.
Note: Only call this method from the default SynchronizationContext
, i.e. from
console programs. The recommended way to start the worker system is via
StartAsync(), which also works with all types of programs, including GUI, etc.
Declaration
public virtual SystemOutcomeStatus Start()
Returns
Type | Description | ||||||||
---|---|---|---|---|---|---|---|---|---|
SystemOutcomeStatus | A SystemOutcomeStatus that represents the completion of the worker system, i.e. when both RunAsync() and user callbacks have completed, and no more workers are running or can be started.
|
Exceptions
Type | Condition |
---|---|
InvalidOperationException |
|
StartAsync()
Starts the worker system asynchronously, which is the recommended way. This will run any user added callbacks, the RunAsync() method, and any workers they add. With the out-of-box WorkerSystem, this includes any Root(Action<WorkerSystem>) overload callback.
Also see Start().
Declaration
public virtual Task<SystemOutcomeStatus> StartAsync()
Returns
Type | Description | ||||||||
---|---|---|---|---|---|---|---|---|---|
Task<SystemOutcomeStatus> | A SystemOutcomeStatus that represents the completion of the worker system, i.e. when both RunAsync() and user callbacks have completed, and no more workers are running or can be started.
|
Exceptions
Type | Condition |
---|---|
InvalidOperationException | The worker system can only be started once. |
TryCancel(String)
Cancel the worker system (unless already completed or no more workers to run), which stops
dispatching workers by setting their OutcomeStatus
to Canceled
, without executing
the worker itself. Any workers that are already running will complete as per normal.
By default (i.e. with EscalateError
set to true
), failed workers will
escalate any error to their parents, recursively, including to the WorkerSystem
.
Note: This method is thread-safe.
Declaration
public bool TryCancel(string message)
Parameters
Type | Name | Description |
---|---|---|
String | message | The message. Can be empty or |
Returns
Type | Description |
---|---|
Boolean |
|