Search Results for

    Show / Hide Table of Contents

    Class WorkerSystemBase

    An abstract class that creates and runs a system of workers. Use it to refer to worker systems when you don't know the exact derived type (e.g. WorkerSystem. Commonly used members include StartAsync(), Start(), and Config.

    This class cannot be inherited directly. To create a custom worker system, derive from WorkerSystemBase<TDerived> and override RunAsync() to provide a custom implementation.

    Inheritance
    Object
    WorkerParent
    WorkerSystemBase
    WorkerSystemBase<TDerived>
    Implements
    IDisposeOnFinished
    Inherited Members
    WorkerParent.AddChildCompletedCallback(Action<WorkerBase>)
    WorkerParent.AddStartingChildrenCallback(Func<WorkerParent, Task<ProgressStatus>>)
    WorkerParent.BytesPerRowBuffer
    WorkerParent.Children
    WorkerParent.DisposeOnFinished<TDisposable>(TDisposable)
    WorkerParent.GetDownstreamFactory<TInput>()
    WorkerParent.HasChildren
    WorkerParent.IsCanceled
    WorkerParent.IsCompleted
    WorkerParent.IsCreated
    WorkerParent.IsError
    WorkerParent.IsFailed
    WorkerParent.IsFatal
    WorkerParent.IsRunning
    WorkerParent.IsSucceeded
    WorkerParent.KeepChildrenLevels
    WorkerParent.Locator
    WorkerParent.LogFactory
    WorkerParent.Logger
    WorkerParent.MaxRunningChildren
    WorkerParent.Name
    WorkerParent.RemoveChildren()
    WorkerParent.RescheduleChildren()
    WorkerParent.RunAsync()
    WorkerParent.RunChildrenAsync(Boolean)
    WorkerParent.RunChildrenAsync()
    WorkerParent.Status
    WorkerParent.Item[String]
    WorkerParent.ToString()
    WorkerParent.WorkerSystem
    WorkerParent.DebugCommands
    WorkerParent.AggregateErrorOutputRows
    WorkerParent.AggregateOutputRows
    WorkerParent.AggregateWorkersCompleted
    WorkerParent.InstantCompleted
    WorkerParent.InstantCreated
    WorkerParent.InstantStarted
    WorkerParent.RunningDuration
    Namespace: actionETL
    Assembly: actionETL.dll
    Syntax
    public abstract class WorkerSystemBase : WorkerParent, IDisposeOnFinished

    Properties

    Config

    Gets the configuration service. If a configuration service is not injected when creating the worker system, one will be automatically created, and populated from a default configuration location, if available.

    Note: This property is thread-safe.

    Declaration
    public AConfig Config { get; }
    Property Value
    Type Description
    AConfig

    The configuration service.

    CreationGuid

    Gets an identifier that is unique to each created worker system. It is logged to the logging system. It can e.g. be added to user data rows, inserted into databases etc., to simplify linking different sets of user data to each other, and to link user data to logging information.

    Note that the logging system uses the default text format: CreationGuid.ToString(); an example of a return value is "382c74c3-721d-4f34-80e5-57657b6cbc27".

    Note: This property is thread-safe.

    Declaration
    public Guid CreationGuid { get; }
    Property Value
    Type Description
    Guid

    IsCancelRequested

    Gets a value indicating whether canceling has been requested for the worker system. Long running workers can poll this property periodically, and halt processing on cancel.

    Note: This property is thread-safe.

    Declaration
    public bool IsCancelRequested { get; }
    Property Value
    Type Description
    Boolean

    true if canceling has been requested; otherwise, false.

    LicenseMaxWorkerTypes

    Gets the maximum number of distinct worker types allowed for this worker system. null if unlimited.

    Declaration
    public int? LicenseMaxWorkerTypes { get; }
    Property Value
    Type Description
    Nullable<Int32>

    Methods

    AddCompletedCallback(Func<WorkerSystemBase, OutcomeStatus, Task<OutcomeStatus>>)

    Adds a callback which will be called as the last step of the worker system Running phase, which is after any children have completed. Multiple callbacks can be added.

    These callbacks are useful for performing cleanup tasks. Also see DisposeOnFinished<TDisposable>(TDisposable) and UsingActionWorker<TDisposable> which are specifically for disposing resources.

    If the worker system is started, these callbacks will run, even if earlier callbacks, or any children have failures. If multiple callbacks have been added, they will all be called (even if one fails) in the reverse order from they were added, e.g. callbacks from derived classes will be called before callbacks from base classes.

    Each callback takes the worker system itself (typed as WorkerSystemBase) and the OutcomeStatus from previous steps (including any children) as parameters.

    Note: Adding the callback is thread-safe.

    Declaration
    public WorkerSystemBase AddCompletedCallback(Func<WorkerSystemBase, OutcomeStatus, Task<OutcomeStatus>> completedFuncAsync)
    Parameters
    Type Name Description
    Func<WorkerSystemBase, OutcomeStatus, Task<OutcomeStatus>> completedFuncAsync

    The asynchronous callback that will be called when the worker system has completed. See "Remarks" below for details.

    Returns
    Type Description
    WorkerSystemBase

    The instance itself, so you can chain multiple calls.

    Remarks

    The callback takes two parameters:

    WorkerSystemBaseThe worker system itself
    OutcomeStatusFrom previous execution steps (including any children)

    The callback implementation is either synchronous and returns a Task<OutcomeStatus> explicitly, or is async and returns an OutcomeStatus.

    • To leave the OutcomeStatus unchanged, return the OutcomeStatus that was passed in as a parameter (and use ToTask() in synchronous callbacks).
    • To change the OutcomeStatus (e.g. changing Error to Succeeded), return a new OutcomeStatus instance. If appropriate, use Combine(Boolean, OutcomeStatus) to also retain the previous information, both of which the system will log.

    If you need to access members of the derived worker system type, there are three options:

    • Use AddCompletedCallback(Func<TDerived, OutcomeStatus, Task<OutcomeStatus>>) instead if you know the derived worker system type.
    • If you have access to the source of the worker system type, implement the callback as a (non-static) method in that type. This allows accessing members without casting the worker system parameter to the derived worker system type.
    • If you don't have access to the source of the worker system type, or you want to implement the callback using a lambda, cast the worker system parameter to the actual derived type of the worker system.

    Examples

    In this example we roll back a transaction on failure:

    workerSystem.AddCompletedCallback((ws, outcomeStatus) =>
    {
        if (!outcomeStatus.IsSucceeded)
            transaction?.Rollback();
        return outcomeStatus.ToTask();
    });
    Exceptions
    Type Condition
    ArgumentNullException

    completedFuncAsync

    InvalidOperationException

    Cannot add 'Starting' callback after the worker system has completed.

    AddStartingCallback(Func<WorkerSystemBase, Task<ProgressStatus>>)

    Adds a callback which will be called as the first step of the worker system Running phase, which is before Root(Action<WorkerSystem>) overloads and RunAsync() is called or worker children are started. Multiple callbacks can be added.

    These callbacks are often used for initialization, or for adding child workers, e.g. from outside the worker system.

    If the worker system is started, these callbacks will all run, even if some fail, in the order they were added, e.g. callbacks added in base classes will be called before callbacks added in derived classes.

    The callback takes the worker system itself as a parameter, typed as WorkerSystemBase.

    Note: Adding the callback is thread-safe.

    Declaration
    public WorkerSystemBase AddStartingCallback(Func<WorkerSystemBase, Task<ProgressStatus>> startingFuncAsync)
    Parameters
    Type Name Description
    Func<WorkerSystemBase, Task<ProgressStatus>> startingFuncAsync

    The asynchronous callback that will be called when the worker system has started. See "Remarks" below for details.

    Returns
    Type Description
    WorkerSystemBase

    The instance itself, so you can chain multiple calls.

    Remarks

    The callback takes the worker system itself as a parameter, typed as WorkerSystemBase.

    If you need to access members of a derived worker system type, there are three options:

    • Use AddStartingCallback(Func<TDerived, Task<ProgressStatus>>) instead if you know the derived worker system type.
    • If you have access to the source of the worker system type, implement the callback as a (non-static) method in that worker. This allows accessing worker system members without casting the worker parameter to the derived worker system type.
    • If you don't have access to the source of the worker system type, or you want to implement the callback using a lambda, cast the worker system parameter to the actual derived type of the worker system.

    The callback implementation is either synchronous and returns a Task<ProgressStatus> explicitly, or is async and returns a ProgressStatus.

    The callback should return:

    • ProgressStatus.NotCompletedTask (or ProgressStatus.NotCompleted) to continue normal processing.
    • ProgressStatus.SucceededTask (or ProgressStatus.Succeeded) to skip calling the Root() and AddStartingChildrenCallback(Func<WorkerParent, Task<ProgressStatus>>) callbacks, and child workers will not be started. All other AddStartingCallback(), AddCompletedCallback(Func<WorkerSystemBase, OutcomeStatus, Task<OutcomeStatus>>), and parent AddChildCompletedCallback(Action<WorkerBase>) callbacks will be called.
    • A failure ProgressStatus on failure, which will give the worker system that failure status. The Root() and AddStartingChildrenCallback() callbacks won't be called, and child workers will not be started. All other AddStartingCallback(), AddCompletedCallback(), and parent AddChildCompletedCallbacks() callbacks will be called.

    Examples

    In this example we allocate a buffer:

    workerSystem.AddStartingCallback(ws =>
    {
        _buffer = new int[BufferSize];
        return ProgressStatus.NotCompletedTask;
    });
    Exceptions
    Type Condition
    ArgumentNullException

    startingFuncAsync

    InvalidOperationException

    Cannot add 'Starting' callback after the worker system has started running.

    Start()

    Starts the worker system synchronously. This will run any user added callbacks, the RunAsync() method, and any workers they add. With the out-of-box WorkerSystem, this includes any Root(Action<WorkerSystem>) overload callback.

    Note: Only call this method from the default SynchronizationContext, i.e. from console programs. The recommended way to start the worker system is via StartAsync(), which also works with all types of programs, including GUI, etc.

    Declaration
    public virtual SystemOutcomeStatus Start()
    Returns
    Type Description
    SystemOutcomeStatus

    A SystemOutcomeStatus that represents the completion of the worker system, i.e. when both RunAsync() and user callbacks have completed, and no more workers are running or can be started.

    await this task and check the result for success or failure (a corresponding result is then also available in the Status property):

    FatalA worker or user code has thrown an unhandled exception, or Fatal has been explicitly returned from a worker. Note that a Fatal status from any worker will stop the whole worker system.
    ErrorA worker is Error, and it and its ancestors have EscalateError set to true
    CanceledThe worker system is canceled
    SucceededAll other cases

    Exceptions
    Type Condition
    InvalidOperationException
    • Can only be used with the default (null) SynchronizationContext, e.g. from console programs, but not from frameworks that set a SynchronizationContext, e.g. WinForms, WPF, UWP, ASP.NET programs, and some testing frameworks.
    • The worker system can only be started once.

    StartAsync()

    Starts the worker system asynchronously, which is the recommended way. This will run any user added callbacks, the RunAsync() method, and any workers they add. With the out-of-box WorkerSystem, this includes any Root(Action<WorkerSystem>) overload callback.

    Also see Start().

    Declaration
    public virtual Task<SystemOutcomeStatus> StartAsync()
    Returns
    Type Description
    Task<SystemOutcomeStatus>

    A SystemOutcomeStatus that represents the completion of the worker system, i.e. when both RunAsync() and user callbacks have completed, and no more workers are running or can be started.

    await this task and check the result for success or failure (a corresponding result is then also available in the Status property):

    FatalA worker or user code has thrown an unhandled exception, or Fatal has been explicitly returned from a worker. Note that a Fatal status from any worker will stop the whole worker system.
    ErrorA worker is Error, and it and its ancestors have EscalateError set to true
    CanceledThe worker system is canceled
    SucceededAll other cases

    Exceptions
    Type Condition
    InvalidOperationException

    The worker system can only be started once.

    TryCancel(String)

    Cancel the worker system (unless already completed or no more workers to run), which stops dispatching workers by setting their OutcomeStatus to Canceled, without executing the worker itself. Any workers that are already running will complete as per normal.

    By default (i.e. with EscalateError set to true), failed workers will escalate any error to their parents, recursively, including to the WorkerSystem.

    Note: This method is thread-safe.

    Declaration
    public bool TryCancel(string message)
    Parameters
    Type Name Description
    String message

    The message. Can be empty or null.

    Returns
    Type Description
    Boolean

    true if canceling the worker system; false if already canceled or completed, or no more workers to run.

    Implements

    IDisposeOnFinished
    In This Article
    Back to top Copyright © 2023 Envobi Ltd