Search Results for

    Show / Hide Table of Contents

    Worker Execution

    Workers are instantiated and executed in a parent-child hierarchy with a WorkerSystem instance at the root of the hierarchy, and with an arbitrary number of descendant workers and levels (see the Worker System and Adding and Grouping Workers articles for more on worker creation).

    Execution Rules

    After the user starts the worker system with StartAsync() or Start(), it in turn runs any code that had been added (typically with one of the Root() callback overloads) to the worker system. That code can in turn create workers that will be automatically started, and they can in turn create child workers, etc.

    The execution order therefore starts with the worker system, followed by the worker hierarchy from top to bottom, while obeying any added explicit constraints (maximum worker children running and start constraints between workers) as well as implicit dataflow link constraints.

    E.g., in this worker system:

    Worker Hierarchy

    • Each of the five rectangles contained inside the WorkerSystem rectangle represent a distinct worker instance
    • Execution order and duration is:
      1. WorkerSystem, until all children have either completed or cannot be started
      2. MoveFileWorker. If successful:
      3. ActionWorker, until it's three children have completed
      4. ExecuteProcessWorker, FileHelperFileSource and AdbInsertTarget in parallel (with AdbInsertTarget delayed until FileHelperFileSource starts sending data)

    The detailed rules for all worker systems are as follows:

    • A worker system instance can only run once. To perform the same work again, a new worker system must be created (optionally with different workers and settings).
    • A worker instance can only run once. To perform the same work again, a new worker must be added to the worker system (optionally with different settings).
    • The library user is responsible for starting the worker system by calling StartAsync() or Start() on the worker system
    • The system is responsible for starting workers. It will keep attempting to start the children for a particular parent, until either all its children have completed, or no more children can be started due to start constraints (see below).
    Important

    By default, workers under a parent are only started after the worker creation code (e.g. added with WorkerSystem.Root()) has fully completed. This behavior can be modified with RunChildrenAsync().

    In the documentation, the workers, constraints, and dataflow links are depicted as follows:

    Worker Legend

    Start Constraints

    Start constraints (a.k.a. dependencies) control whether a particular worker can be started or not. If any one constraint blocks the worker from being started, the worker remains in the Created state.

    For each parent worker (and similarly for the worker system), the start constraints of all unstarted children are (re)evaluated:

    • Automatically when:
      • The parent worker or worker system RunAsync() method has completed (which includes the WorkerSystem.Root() callback)
      • The parent's RunChildrenAsync() is called (e.g. to implement looping)
      • Any child worker changes Status
      • The first input port (if any) of any child worker goes beyond the Ready state
      • A start constraint property (IsStartable or MaxRunningChildren) on any child worker is set to a new value
    • Manually when RescheduleChildren() is called on the parent. This is only rarely needed - in virtually all cases the above automatic behavior is sufficient.

    There are two start constraints available, covered next: IsStartable and MaxRunningChildren.

    WorkerBase.IsStartable

    IsStartable is a callback that can block a worker from being started. Set this (via a constructor parameter or the property itself) to define a custom start constraint. After the worker has started, this property is ignored.

    By default:

    • Workers without input ports (i.e. non-dataflow workers and dataflow sources) are always startable
    • Workers with input ports (i.e. transforms and targets) are blocked until at least one input has gone beyond the Ready state (i.e. an upstream worker sends data or port completion, which sets Inputs.AnyActivated to true)

    This default behavior is implemented in the DefaultIsStartable() method, which you can use if you want to extend the default behavior instead of completely replace it, e.g.:

    worker.IsStartable = () => worker.DefaultIsStartable() && otherWorker.IsCompleted;
    

    The IsStartable callback (e.g. an anonymous function like above, or a method) should return false to block the worker, or true to allow it to be started. Set it to null to get the default behavior.

    Important
    • To avoid dataflow deadlocks, all dataflow source workers that are linked to each other via ports (normally indirectly via transforms and targets) will be started as a group, but only when all of their (default or custom) start constraints are startable.
    • If a dataflow worker starts, all other dataflow workers linked (directly and indirectly) via ports MUST be allowed to start at some point, otherwise some of them (and the whole worker system) can hang forever from input ports that never completes, or being unable to send rows downstream.

    For further details on setting dataflow constraints, please see Dataflow Blocking and Row Buffering.

    Note
    • Other limits (i.e. MaxRunningChildren) can also stop a worker from being run.
    • Automatic evaluation of start constraints only happens on changes in the parent and the worker siblings, not on changes in any other workers. It is therefore best practice to only reference the parent and siblings workers when setting IsStartable.

    Example

    // No constraints, will be started immediately
    var step1 = new ActionWorker(ws, "Step 1", aw =>
    {
        // ...
    };
    
    // Only start "Step 2" if "Step 1" completed successfully
    var step2 = new ActionWorker(ws, "Step 2", aw =>
    {
        // ...
    }
    , () => step1.IsSucceeded // This parameter sets the IsStartable property
    );
    
    // Only start the "Step 3" if either "Step 1" OR "Step 2" has 
    // completed (with any status)
    var step3 = new ActionWorker(c, "Step 3", aw =>
    {
        // ...
    }
    , () => step1.IsCompleted || step2.IsCompleted 
    );
    

    SucceededSequence()

    Instead of explicitly setting IsSucceeded constraints on multiple sequential workers, you can call a SucceededSequence<TLastWorker>(TLastWorker) overload to accomplish the same thing:

    var lastWorkerInSequence = new ActionWorker(/* ... */).SucceededSequence(
          new AdbTableNonQueryWorker(/* ... */)
        , new ActionWorker(/* ... */)
        );
    

    WorkerParent.MaxRunningChildren

    MaxRunningChildren sets the maximum number of children for a particular parent that can be running simultaneously. When this limit is reached, no new children will be started until the number of running children is lower than the value of this property.

    Note however that to avoid scheduling deadlocks, workers with input ports, i.e. transforms and targets, will be started irrespective of this setting. Transforms and targets still count towards the number of running children, and can therefore contribute to blocking non-dataflow workers and sources from being started.

    The default value is -1, i.e. unlimited.

    This property is typically used to limit the aggregate resource consumption (number of database connections, CPU, memory, ...) of a parent. Note that it only limits the number of direct children started, not grandchildren etc.

    Important
    • Avoid setting the limit too low, especially with dataflow children, since that could lead to the worker system hanging.
    • When setting MaxRunningChildren, it is often useful to group related workers under an ActionWorker so that they are all enabled at the same time.

    Extension Callbacks

    Users can extend any existing worker or worker system instance by adding callbacks (as opposed to instantiating additional workers or creating Custom Workers or custom worker systems).

    The callbacks will be called at various points throughout the instance execution. Unlike instantiating additional workers, these callbacks are guaranteed to only run when the worker or worker system has actually been started. Callbacks can be useful on a worker or worker system instance for:

    • Performing custom setup and cleanup
    • Checking and potentially modifying the success or failure status of an execution step
    • Sending notifications or counting invocations
    • Executing code when all child workers start or when each child worker completes

    Callbacks are added with the following methods, see these and Detailed Life-cycle below for further information:

    • On worker systems:
      • AddStartingCallback(Func<TDerived, Task<ProgressStatus>>)
      • AddCompletedCallback(Func<TDerived, OutcomeStatus, Task<OutcomeStatus>>)
    • On workers and worker systems:
      • AddStartingChildrenCallback(Func<WorkerParent, Task<ProgressStatus>>)
      • AddChildCompletedCallback(Action<WorkerBase>)
    • On workers:
      • AddStartingCallback(Func<WorkerBase, Task<ProgressStatus>>)
      • AddRanCallback(Func<WorkerBase, OutcomeStatus, WorkerParentChildrenState, Task<OutcomeStatus>>)
      • AddCompletedCallback(Func<WorkerBase, OutcomeStatus, Task<OutcomeStatus>>)

    Status Property

    The worker system and each worker has a WorkerParentStatus Status property that describes its current state:

    • Created - Has been instantiated, e.g. new ActionWorker(...)
    • Running - Has been started
    • Succeeded - Successful completion
    • Canceled - Completed prematurely due to cancellation
    • Error - Completed with one or more errors
    • Fatal - Completed with one or more fatal (i.e. unrecoverable) errors

    Worker Error Handling has more details on unsuccessful outcomes.

    Detailed Life-cycle

    This section describes all the possible execution steps and callbacks of the worker system and its workers. It is normally not required reading, but can be useful to understand when performing low-level development.

    Worker Life-cycle

    All workers have a RunAsync() method that the system calls (during the Running phase of the worker), which typically contains all the worker specific logic, including initialization and cleanup. The Running phase does however include additional places where logic can optionally be inserted, both at compile time and at runtime, to e.g. customize the initialization, cleanup, and error handling of existing workers.

    The following is a detailed description of the worker life-cycle:

    Created

    1: The library user code creates the worker, e.g. new Worker(parent, "No-op");

    Running

    The worker is started by the system, which will:

    2: Change any output and error output ports' State from Created to Ready

    3: Call and await any callbacks that have been added by AddStartingCallback(Func<WorkerBase, Task<ProgressStatus>>). If any callback returned a failure status, skip to step 8. Otherwise, if any callback returned a Succeeded status, empty any input ports, successfully complete any output ports, and skip to step 8.

    4: Call and await the RunAsync() method, which is normally where the bulk of the worker logic resides

    5: Call and await any callbacks that have been added with AddRanCallback(Func<WorkerBase, OutcomeStatus, WorkerParentChildrenState, Task<OutcomeStatus>>), passing in the OutcomeStatus the previous step returned (normally Succeeded) and WorkerParentChildrenState.

    6: Fail the worker if there are any active input or output ports, and empty the input ports. Successfully complete any error output ports.

    7: If no errors above, call and await RunChildrenAsync() to run any worker children unless the library user has already called it

    • You can call RunChildrenAsync() in RunAsync() or via AddRanCallback(), but then you must also await its completion.
    • RunChildrenAsync() also calls and incorporates the result of any callbacks added with AddStartingChildrenCallback(Func<WorkerParent, Task<ProgressStatus>>).

    8: Call and await any callbacks that have been added with AddCompletedCallback(Func<WorkerBase, OutcomeStatus, Task<OutcomeStatus>>), passing in the last OutcomeStatus of the previous steps (normally Succeeded)

    9: Set the worker final Status to the last status from the previous steps, by default Succeeded

    Note

    Callbacks in step 5 and 8 takes the current status as input, and can change it (unless it's Fatal), e.g. from Error to Succeeded when handling a known error.

    Succeeded, Canceled, Error, or Fatal

    The system performs additional steps after the worker is completed:

    10: Call any AddChildCompletedCallback(Action<WorkerBase>) callbacks added to the worker parent

    11: Remove any worker children, unless disabled with KeepChildrenLevels

    12: Dispose any disposables added with DisposeOnFinished<TDisposable>(TDisposable)

    WorkerSystem Life-cycle

    The following is a detailed description of the WorkerSystem life-cycle, which has similar but fewer steps than a worker:

    Created

    1: The library user creates the worker system by using new, and adds logic for it to run, e.g.

    var workerSystem = new WorkerSystem("Refresh Staging")
        .Root(ws => { /* ... */ });
    

    Running

    2: The library user starts the worker system by calling StartAsync() or Start()

    The system will:

    3: Call and await any callbacks that have been added with AddStartingCallback(Func<TDerived, Task<ProgressStatus>>). If any callback returned Succeeded or a failure status, skip to step 6

    4: Call and await the callback added with Root(), if any

    5: If no errors above, call and await RunChildrenAsync() to run any worker children unless the library user has already called it in the Root() callback

    • RunChildrenAsync() also calls and incorporates the result of any callbacks added with AddStartingChildrenCallback(Func<WorkerParent, Task<ProgressStatus>>).

    6: Call and await any callbacks that have been added with AddCompletedCallback(Func<TDerived, OutcomeStatus, Task<OutcomeStatus>>), passing in the last OutcomeStatus of the previous steps (normally Succeeded)

    7: Set the worker system final Status to the last status from the previous steps, by default Succeeded

    Note

    Callbacks in step 6 takes the current status as input, and can change it (unless it's Fatal), e.g. from Error to Succeeded when handling a known error.

    Succeeded, Canceled, Error, or Fatal

    The system performs an additional step after setting the completion status:

    8: Dispose any disposables added with DisposeOnFinished<TDisposable>(TDisposable)

    See Also

    • Common Tasks
    • Getting Started
      • Dotnet Templates
      • Add actionETL Manually
      • Development Guidelines
    • Worker System
    • Workers
    • Troubleshooting
    In This Article
    Back to top Copyright © 2023 Envobi Ltd