Search Results for

    Show / Hide Table of Contents

    Class WorkerBase<TDerived>

    This class contains worker members that uses the type of the final derived worker (TDerived). All workers inherit from this class, directly or indirectly. Derive from this class (or another more derived worker) when creating a new custom worker.

    To refer to workers where you don't know the final derived worker type, instead use WorkerBase.

    When starting a worker, the library calls RunAsync() during the worker Running phase. A derived class must override this abstract method, which normally contains the bulk of the worker logic.

    Note that the worker Running phase also includes additional places where logic can optionally be inserted via callbacks, to e.g. customize the initialization, cleanup, and error handling of existing workers. This is mostly used when customizing workers that are not designed to be derived from (i.e. without a "Base" suffix). See Worker Life-cycle for details.

    See Workers for how to use workers, and see Custom Workers for how to develop your own workers.

    Inheritance
    Object
    WorkerParent
    WorkerBase
    WorkerBase<TDerived>
    ActionWorkerBase<TDerived>
    AdbDataReaderSource<TOutput>
    AdbExecuteNonQueryTarget<TInputError>
    AdbExecuteNonQueryWorker
    AdbExecuteScalarWorker<TResult>
    AdbInsertTarget<TInputError>
    AdbTableNonQueryWorker
    AdbTransactionActionWorker
    AdbMySqlConnectorBulkInsertTarget<TInput>
    AdbSqlClientBulkInsertTarget<TInput>
    AggregateTransform<TInputAccumulateOutput>
    AggregateTransform<TInput, TAccumulateOutput>
    AggregateTransform<TInput, TAccumulate, TOutput>
    CollectionTarget<TInput, TCollection>
    CopyFileWorker
    CreateFileWorker
    DeleteFileWorker
    DictionaryLookupSplitTransform<TInputOutputError, TKey, TValue>
    DictionaryLookupSplitTransform<TInputOutputError, TDictionaryInput, TKey, TValue>
    DictionaryLookupTransform<TInputOutputError, TKey, TValue>
    DictionaryLookupTransform<TInputOutputError, TDictionaryInput, TKey, TValue>
    DictionaryTarget<TInput, TKey, TValue>
    EnumerableSource<TOutput>
    XlsxSource<TOutput>
    XlsxTarget<TInput>
    ExecuteProcessWorker
    FileExistsWorker
    FileHelperFileSource<TOutput>
    FileHelperFileTarget<TInput>
    FileHelperStreamSource<TOutput>
    FileHelperStreamTarget<TInput>
    FileHelperStringSource<TOutput>
    ForEachActionWorker<TItem>
    JoinMergeSortedTransformBase<TDerived, TLeftInput, TRightInput, TOutput>
    MergeSortedTransform<TInputOutput>
    MoveFileWorker
    MulticastTransform<TInputOutput>
    PortPassThroughSource<TOutput>
    PortPassThroughTarget<TInputOutput>
    RepeatRowsSource<TOutput>
    RowsActionSource<TOutput>
    RowsActionSource<TOutput, TError>
    RowsActionTarget<TInput>
    RowsActionTransform<TInputOutputError>
    RowsActionTransform<TInputError, TOutput>
    RowSourceBase<TDerived, TOutput>
    RowsSourceBase<TDerived, TOutput>
    RowsTargetBase<TDerived, TInput>
    RowsTransformBase<TDerived, TInput, TOutput>
    SortTransform<TInputOutput>
    SourceBase<TDerived, TOutput>
    SourceBase<TDerived, TOutput, TError>
    SplitTransform<TInputOutputError>
    TargetBase<TDerived, TInput>
    TargetBase<TDerived, TInput, TError>
    TransformBase<TDerived, TInput, TOutput>
    TrashTarget<TInput>
    TwoInputTransformBase<TDerived, TLeftInput, TRightInput, TOutput>
    UnionAllTransform<TInputOutput>
    UsingActionWorker<TDisposable>
    WhileActionWorker<T>
    Worker
    Implements
    IDisposeOnFinished
    Inherited Members
    WorkerBase.AddCompletedCallback(Func<WorkerBase, OutcomeStatus, Task<OutcomeStatus>>)
    WorkerBase.AddRanCallback(Func<WorkerBase, OutcomeStatus, WorkerParentChildrenState, Task<OutcomeStatus>>)
    WorkerBase.AddStartingCallback(Func<WorkerBase, Task<ProgressStatus>>)
    WorkerBase.DefaultIsStartable()
    WorkerBase.ErroredPortErrorsWorkerProtected
    WorkerBase.ErrorOutputs
    WorkerBase.EscalateError
    WorkerBase.Inputs
    WorkerBase.IsStartable
    WorkerBase.Outputs
    WorkerBase.Parent
    WorkerBase.SucceededSequence<TLastWorker>(WorkerBase, WorkerBase, WorkerBase, WorkerBase, WorkerBase, TLastWorker)
    WorkerBase.SucceededSequence<TLastWorker>(WorkerBase, WorkerBase, WorkerBase, WorkerBase, TLastWorker)
    WorkerBase.SucceededSequence<TLastWorker>(WorkerBase, WorkerBase, WorkerBase, TLastWorker)
    WorkerBase.SucceededSequence<TLastWorker>(WorkerBase, WorkerBase, TLastWorker)
    WorkerBase.SucceededSequence<TLastWorker>(WorkerBase, TLastWorker)
    WorkerBase.SucceededSequence<TLastWorker>(TLastWorker)
    WorkerParent.AddChildCompletedCallback(Action<WorkerBase>)
    WorkerParent.AddStartingChildrenCallback(Func<WorkerParent, Task<ProgressStatus>>)
    WorkerParent.Children
    WorkerParent.DisposeOnFinished<TDisposable>(TDisposable)
    WorkerParent.GetDownstreamFactory<TInput>()
    WorkerParent.HasChildren
    WorkerParent.IsCanceled
    WorkerParent.IsCompleted
    WorkerParent.IsCreated
    WorkerParent.IsError
    WorkerParent.IsFailed
    WorkerParent.IsFatal
    WorkerParent.IsRunning
    WorkerParent.IsSucceeded
    WorkerParent.KeepChildrenLevels
    WorkerParent.Locator
    WorkerParent.LogFactory
    WorkerParent.Logger
    WorkerParent.MaxRunningChildren
    WorkerParent.Name
    WorkerParent.RemoveChildren()
    WorkerParent.RescheduleChildren()
    WorkerParent.RunAsync()
    WorkerParent.RunChildrenAsync(Boolean)
    WorkerParent.RunChildrenAsync()
    WorkerParent.Status
    WorkerParent.Item[String]
    WorkerParent.ToString()
    WorkerParent.WorkerSystem
    WorkerParent.DebugCommands
    WorkerParent.AggregateErrorOutputRows
    WorkerParent.AggregateOutputRows
    WorkerParent.AggregateWorkersCompleted
    WorkerParent.InstantCompleted
    WorkerParent.InstantCreated
    WorkerParent.InstantStarted
    WorkerParent.RunningDuration
    Namespace: actionETL
    Assembly: actionETL.dll
    Syntax
    public abstract class WorkerBase<TDerived> : WorkerBase, IDisposeOnFinished where TDerived : WorkerBase<TDerived>
    Type Parameters
    Name Description
    TDerived

    The type of the derived worker, e.g.:

    public class MyWorker
        : WorkerBase<MyWorker>
    { 
        // ...
    }

    Constructors

    WorkerBase(WorkerParent)

    Initializes a new instance of the WorkerBase abstract worker, the main unit of execution. All workers inherit from this class, indirectly via WorkerBase<TDerived>.

    The worker name is generated automatically from the worker type and an increasing number starting at 1.

    Declaration
    protected WorkerBase(WorkerParent workerParent)
    Parameters
    Type Name Description
    WorkerParent workerParent

    The parent worker or worker system that the new child worker will be added to. Cannot be null.

    Exceptions
    Type Condition
    ArgumentNullException

    workerParent - All workers must have a parent. The top level workers have the worker system as parent.

    InvalidOperationException
    • Cannot add child worker to parent which has completed. Are you adding it to the correct parent?
    • Cannot add worker to parent, since its children have been started. Are you adding it to the correct parent?

    WorkerBase(WorkerParent, String)

    Initializes a new instance of the WorkerBase<TDerived> abstract worker. All workers inherit from this class, directly or indirectly. Also see WorkerBase.

    Declaration
    protected WorkerBase(WorkerParent workerParent, string workerName)
    Parameters
    Type Name Description
    WorkerParent workerParent

    The parent worker or worker system that the new child worker will be added to. Cannot be null.

    String workerName

    Name of the worker.

    Set to a prefix plus a trailing "/" (e.g. "MyPrefix-/") to generate a unique name from the prefix plus an increasing number starting at 1.

    While less useful, set to null, whitespace or "/" to generate a unique name from the worker type plus an increasing number starting at 1.

    The name cannot otherwise contain "/", and cannot start with double underscore "__".

    Exceptions
    Type Condition
    ArgumentException

    workerName:

    • Workers with the same parent must have unique names.
    • Worker and worker system names cannot contain '/' or start with double underscore '__'.
    ArgumentNullException

    workerParent - All workers must have a parent. The top level workers have the worker system as parent.

    InvalidOperationException
    • Cannot add child worker to parent which has completed. Are you adding it to the correct parent?
    • Cannot add worker to parent, since its children have been started. Are you adding it to the correct parent?

    WorkerBase(WorkerParent, String, Func<Boolean>)

    Initializes a new instance of the WorkerBase<TDerived> abstract worker. All workers inherit from this class, directly or indirectly. Also see WorkerBase.

    Declaration
    protected WorkerBase(WorkerParent workerParent, string workerName, Func<bool> isStartableFunc)
    Parameters
    Type Name Description
    WorkerParent workerParent

    The parent worker or worker system that the new child worker will be added to. Cannot be null.

    String workerName

    Name of the worker.

    Set to a prefix plus a trailing "/" (e.g. "MyPrefix-/") to generate a unique name from the prefix plus an increasing number starting at 1.

    While less useful, set to null, whitespace or "/" to generate a unique name from the worker type plus an increasing number starting at 1.

    The name cannot otherwise contain "/", and cannot start with double underscore "__".

    Func<Boolean> isStartableFunc

    Function to calculate the worker start constraint; it should return true for startable and false for not startable. Defaults to startable if null. See Worker Execution for more details.

    Exceptions
    Type Condition
    ArgumentException

    workerName:

    • Workers with the same parent must have unique names.
    • Worker and worker system names cannot contain '/' or start with double underscore '__'.
    ArgumentNullException

    workerParent - All workers must have a parent. The top level workers have the worker system as parent.

    InvalidOperationException
    • Cannot add child worker to parent which has completed. Are you adding it to the correct parent?
    • Cannot add worker to parent, since its children have been started. Are you adding it to the correct parent?
    InvalidOperationException

    License failure. Maximum number of worker types exceeded. Please switch to a license that supports more or unlimited worker types, or use fewer worker types in this worker system.

    Methods

    AddCompletedCallback(Func<TDerived, OutcomeStatus, Task<OutcomeStatus>>)

    Adds a callback which will be called as the last step of the worker Running phase, which is after any children and ports have completed. Multiple callbacks can be added.

    These callbacks are useful for performing cleanup tasks. Also see DisposeOnFinished<TDisposable>(TDisposable) and UsingActionWorker<TDisposable> which are specifically for disposing resources.

    If the worker is started, these callbacks will run, even if earlier callbacks, RunAsync(), or any children have failures. If multiple callbacks have been added, they will all be called (even if one fails) in the reverse order from they were added, e.g. callbacks from derived classes will be called before callbacks from base classes.

    Each callback takes the worker itself and the OutcomeStatus from previous steps (including any children) as parameters.

    Note: Adding the callback is thread-safe.

    Declaration
    public TDerived AddCompletedCallback(Func<TDerived, OutcomeStatus, Task<OutcomeStatus>> completedFuncAsync)
    Parameters
    Type Name Description
    Func<TDerived, OutcomeStatus, Task<OutcomeStatus>> completedFuncAsync

    The asynchronous callback that will be called when the worker has completed. See "Remarks" below for details.

    Returns
    Type Description
    TDerived

    The instance itself, so you can chain multiple calls.

    Remarks

    The callback takes two parameters:

    TDerivedThe worker itself
    OutcomeStatusFrom previous execution steps (including any children)

    The callback implementation is either synchronous and returns a Task<OutcomeStatus> explicitly, or is async and returns an OutcomeStatus.

    • To leave the OutcomeStatus unchanged, return the OutcomeStatus that was passed in as a parameter (and use ToTask() in synchronous callbacks).
    • To change the OutcomeStatus (e.g. changing Error to Succeeded), return a new OutcomeStatus instance. If appropriate, use Combine(Boolean, OutcomeStatus) to also retain the previous information, both of which the system will log.

    If you only have access to the WorkerBase type, use AddCompletedCallback(Func<WorkerBase, OutcomeStatus, Task<OutcomeStatus>>) instead.

    Examples

    In this example we roll back a transaction on failure:

    worker.AddCompletedCallback((w, outcomeStatus) =>
    {
        if (!outcomeStatus.IsSucceeded)
            transaction?.Rollback();
        return outcomeStatus.ToTask();
    });
    Exceptions
    Type Condition
    ArgumentNullException

    completedFuncAsync

    InvalidOperationException

    Cannot add 'Starting' callback after the worker has completed.

    AddRanCallback(Func<TDerived, OutcomeStatus, WorkerParentChildrenState, Task<OutcomeStatus>>)

    Adds a callback which will be called immediately after RunAsync() finishes, e.g. for performing cleanup.

    Note that after these callbacks return, ports will be checked for any error status or not being completed, and child workers will be run (if not already run explicitly, or the worker RunAsync() failed), both of which can fail the current worker.

    If multiple callbacks have been added, they will all be called (even if one fails) in the reverse order from they were added, e.g. callbacks from derived classes will be called before callbacks from base classes.

    If RunAsync() does not get called (due to an AddStartingCallback() callback), AddRanCallback() callbacks will not run.

    Each callback takes the worker itself, the OutcomeStatus from RunAsync(), and WorkerParentChildrenState as parameters.

    Note: Adding the callback is thread-safe.

    Declaration
    public TDerived AddRanCallback(Func<TDerived, OutcomeStatus, WorkerParentChildrenState, Task<OutcomeStatus>> ranFuncAsync)
    Parameters
    Type Name Description
    Func<TDerived, OutcomeStatus, WorkerParentChildrenState, Task<OutcomeStatus>> ranFuncAsync

    The asynchronous callback that will be called immediately after RunAsync() finishes. See "Remarks" below for details.

    Returns
    Type Description
    TDerived

    The instance itself, so you can chain multiple calls.

    Remarks

    The callback takes three parameters:

    TDerivedThe worker itself
    OutcomeStatusThe RunAsync return value
    WorkerParentChildrenStateDescribes whether children have already been added or completed. If completed, you must RemoveChildren() before adding any new child workers. You can also call a RunChildrenAsync() overload, but must then also await its completion.

    The callback implementation is either synchronous and returns a Task<OutcomeStatus> explicitly, or is async and returns an OutcomeStatus.

    • To leave the OutcomeStatus unchanged, return the OutcomeStatus that was passed in as a parameter (and use ToTask() in synchronous callbacks).
    • To change the OutcomeStatus (e.g. changing Error to Succeeded), return a new OutcomeStatus instance. If appropriate, use Combine(Boolean, OutcomeStatus) to also retain the previous information, both of which the system will log.

    If you only have access to a WorkerBase type, use AddRanCallback(Func<WorkerBase, OutcomeStatus, WorkerParentChildrenState, Task<OutcomeStatus>>) instead.

    Examples

    In this example we roll back a transaction on failure:

    worker.AddRanCallback((w, outcomeStatus) =>
    {
        if (!outcomeStatus.IsSucceeded)
            transaction?.Rollback();
        return outcomeStatus.ToTask();
    });
    Exceptions
    Type Condition
    ArgumentNullException

    ranFuncAsync

    InvalidOperationException

    Cannot add 'Ran' callback after the worker has completed.

    AddStartingCallback(Func<TDerived, Task<ProgressStatus>>)

    Adds a callback which will be called as the first step of the worker Running phase, which is before RunAsync() is called. Multiple callbacks can be added.

    These callbacks are often used for initialization, or for adding child workers, e.g. when not having access to the worker implementation.

    If the worker is started, these callbacks will run, even if some fail, in the order they were added, e.g. callbacks added in base classes will be called before callbacks added in derived classes.

    Each callback takes the worker itself as a parameter.

    Note: Adding the callback is thread-safe.

    Declaration
    public TDerived AddStartingCallback(Func<TDerived, Task<ProgressStatus>> startingFuncAsync)
    Parameters
    Type Name Description
    Func<TDerived, Task<ProgressStatus>> startingFuncAsync

    The asynchronous callback that will be called when the worker has started. See "Remarks" below for details.

    Returns
    Type Description
    TDerived

    The instance itself, so you can chain multiple calls.

    Remarks

    The callback takes the worker itself as a parameter.

    The callback implementation is either synchronous and returns a Task<ProgressStatus> explicitly, or is async and returns a ProgressStatus.

    The callback should return:

    • ProgressStatus.NotCompletedTask (or ProgressStatus.NotCompleted) to continue normal processing.
    • ProgressStatus.SucceededTask (or ProgressStatus.Succeeded) to silently empty any input ports and transmit SendSucceeded() to any output ports. RunAsync(), AddRanCallback() and AddStartingChildrenCallback() callbacks will not be called, and child workers will not be started. All other AddStartingCallback(), AddCompletedCallback and parent AddChildCompletedCallback(Action<WorkerBase>) callbacks will be called.
    • A failure ProgressStatus on failure, which will give the worker that failure status. Neither RunAsync() nor AddRanCallback() callbacks will be called, and child workers will not be started. All other AddStartingCallback(), AddCompletedCallback() and parent AddChildCompletedCallback() callbacks will however be called.

    If you only have access to the WorkerBase type, use AddStartingCallback(Func<WorkerBase, Task<ProgressStatus>>) instead.

    Examples

    In this example we allocate a row buffer:

    worker.AddStartingCallback(w =>
    {
        _outputRows = new TOutput[Output.BufferCapacity];
        return ProgressStatus.NotCompletedTask;
    });
    Exceptions
    Type Condition
    ArgumentNullException

    startingFuncAsync

    InvalidOperationException

    Cannot add 'Starting' callback after the worker has started running.

    Implements

    IDisposeOnFinished
    In This Article
    Back to top Copyright © 2021 Envobi Ltd