Search Results for

    Show / Hide Table of Contents

    Dataflow Blocking and Row Buffering

    This article describes dataflow row:

    • Blocking - The delaying or stopping of:
      • Rows being taken from or sent to a worker port, or
      • Worker results being created
    • Buffering - Ports and workers can process and transfer rows either one by one, or multiple rows at a time. To support this, ports use row buffers, which improves overall performance by smoothing out differences in speed between the upstream and downstream workers, and minimizes the overhead of moving rows between workers. Increasing buffering too much can however exhaust available memory, leading to poor performance.

    This information is useful for understanding and troubleshooting dataflow memory use, and any delays in row processing.

    Note

    Also see the DataflowThroughput Sample project, which generates and benchmarks a large amount of dataflow traffic.

    Configurable Members

    Port RowsPerBuffer and WorkerParent.BytesPerRowBuffer

    Each pair of linked (upstream output and downstream input) ports have a specific number of RowsPerBuffer, which is the recommended and maximum number of rows when sending or taking rows to or from the ports synchronously in batches.

    Note

    The RowsPerBuffer setting still affects port buffering and transfer speed when taking or sending rows asynchronously or row by row, but is in these cases otherwise less important.

    By default, RowsPerBuffer is automatically calculated by dividing WorkerParent.BytesPerRowBuffer with an upfront estimate of the average row size in bytes. The calculation is then skewed to reduce the likelihood of using very small buffers (64 rows or less). In most cases, this approach results in a good balance between dataflow memory use, buffering to avoid row starvation, and speed of dataflow transfers.

    Note
    • The actual InputPort.RowsPerBuffer property value is logged with the System.Status.Running category, and also with any Worker.Status.Running category if the value is different from its parent worker or worker system:

      ... | System.Status.Running | BytesPerRowBuffer=65536

      ... | Worker.Status.Running | BytesPerRowBuffer=400000

    • The actual RowsPerBuffer value is logged for each port when the worker runs, here 1024 rows:

      ... | Worker.Status.Running | Inputs[Input]=Outputs[/MyWorker.Outputs[Output]]:1024rows

    If the resulting default value of RowsPerBuffer for a port is e.g. too large and consuming too much memory, or too small and providing too little buffering, you can:

    • Change WorkerParent.BytesPerRowBuffer from its default 65536 value. This will affect all ports of any descendant workers, and if this is a worker any input ports it might have.

      Note: This is a good way to increase or decrease row buffer size and memory use across multiple workers.

    • Explicitly set InputPort.RowsPerBuffer, which overrides the automatically calculated value.

      Note: This is a good way to increase or decrease row buffer size and memory use of individual input ports.

    Increasing RowsPerBuffer can improve performance, but also consumes more memory. Setting RowsPerBuffer too large should be avoided, since it can cause the application to swap large amounts of memory out to disk, leading to drastically reduced performance.

    The above properties can be set directly as well as with configurations (InputPortRowsPerBuffer and WorkerParentBytesPerRowBuffer), with a precedence from highest to lowest:

    1. InputPort.RowsPerBuffer configuration with applyTo specified (i.e. a single port)
    2. InputPort.RowsPerBuffer property
    3. WorkerParent.BytesPerRowBuffer configuration with applyTo specified (i.e. a specific worker or worker system and its descendants)
    4. WorkerParent.BytesPerRowBuffer property
    5. WorkerParent.BytesPerRowBuffer inherited property value from parent worker or worker system
    6. InputPort.RowsPerBuffer configuration without applyTo specified (i.e. all workers and the worker system)
    7. WorkerParent.BytesPerRowBuffer configuration without applyTo specified (i.e. all workers and the worker system)
    8. WorkerParent.BytesPerRowBuffer default value in the worker system: 65536 bytes

    Also see Buffering and Memory Consumption below.

    Port BufferingMode

    Each pair of linked (upstream output and downstream input) port has a specific BufferingMode, which controls whether it only buffers up to a Limited number of rows (the most common case, currently four times RowsPerBuffer), or if it has Full (i.e. unlimited) buffering. See PortBufferingMode for further details.

    Worker IsStartable

    The user of the worker can delay the start of the worker by using IsStartable, which would also delay any row processing.

    Blocking

    Most dataflow workers and input ports are non-blocking, i.e. while they might process multiple rows at a time if multiple rows are available, they:

    • Never require accessing multiple input rows before consuming them
    • Consume input rows quickly, without rows ever getting stuck in the port or worker buffers waiting for other input rows or external events

    Examples include CollectionTarget<TInput> and UnionAllTransform<TInputOutput>. Note that despite having multiple input ports, the latter is implemented to be non-blocking, which is possible since the ports are independent.

    There are however workers and scenarios that can delay or stop rows from being taken or sent, or results from being created, as described below.

    Worker Start Constraints

    By default, dataflow source workers are always startable, while transforms and targets are not started until at least one input has gone beyond the Ready state (i.e. an upstream worker sends data or port completion).

    Important

    To avoid dataflow deadlocks, all dataflow source workers that are linked to each other via ports (normally indirectly via transforms and targets) will be started as a group, but only when all of their (default or custom) start constraints are startable.

    Note

    An upstream worker can always send some initial rows until the port buffer fills up, even if the downstream worker is not allowed to start. If the BufferingMode is Full, upstream can actually send all rows without the downstream worker starting. This will obviously increase memory use.

    If the library user sets a custom start constraint on a dataflow worker, this can of course block row processing.

    Important

    Setting start constraints between transform and/or target workers that share a common upstream worker could lead to deadlock. To avoid this, ensure that:

    • Either the worker with the start constraint or one of its upstream workers before the shared common worker already uses full buffering, or
    • Set the input port of the worker with the start constraint to use PortBufferingMode.Full. See e.g. the AdbTransactionActionWorker Example.

    Also see Start Constraints in Worker Execution.

    Multi-Row Dependencies

    Some workers must access more than one row at a time (from the same or a different input port) to continue, which can delay row processing.

    If row buffering is required, they will either use a BufferingMode of Full, or implement it internally in the worker, either of which will increase memory use.

    These workers can be categorized as partially or fully blocking:

    Partially Blocking

    • How long processing is blocked (and potentially how many rows they must buffer) depends on the incoming data (or the worker configuration)
    • The worker can (but is not required to) produce output rows before seeing all input rows

    Examples include:

    • Multi-input workers where the inputs are dependent, e.g. MergeSortedTransform<TInputOutput> and the various *JoinMergeSortedTransform workers (see InnerJoinMergeSortedTransform<TLeftInput, TRightInput, TOutput> for how to minimize their memory usage)
    • Single-input workers with multi-row dependencies, e.g. an AggregateTransform worker with a custom aggregation that outputs the largest of each set of 100 incoming rows.

    Fully Blocking

    A few workers and input ports are fully blocking, where all incoming rows must be received before processing can continue. Examples include:

    • SortTransform<TInputOutput> (which must buffer all rows, with associated memory use)
    • AggregateTransform workers (which do not require buffering rows), such as AggregateTransform<TInputAccumulateOutput>

    Back Pressure

    If a downstream worker consumes its input rows slower than the upstream worker produces them (or it's blocked from reading its input rows), and the BufferingMode is Limited, then the port buffer will fill up and the upstream worker will be prevented from sending more rows until the buffer clears. This is also why workers must never send more rows to a data output port than there is row demand (or an error will be raised). Also see Row Flow Control.

    This process is called back pressure, and is part of the row flow control that reduces overall memory use. Do note however that ports that have full buffering, such as the SortTransform input buffering, are not affected by back pressure.

    Worker Specific External Conditions

    Any worker can implement additional conditions that might delay processing. The most common example is source workers that access external data sources, e.g. a database, where retrieving the data can be delayed.

    Other examples include waiting for a trigger file, or for a timer to expire.

    Buffering and Memory Consumption

    A few dataflow workers require memory for caches or for storing temporary results, e.g. DictionaryLookupTransformFactory and AggregateTransformFactory workers.

    For most dataflow workers however, their port (and any internal row) buffers constitutes their only significant memory usage. Specifically (although subject to change):

    • Each actual row in a port or worker internal buffer consumes the memory needed for the row class instance, see e.g. https://codeblog.jonskeet.uk/2011/04/05/of-memory-and-strings for a detailed description
    • Additional memory is required to hold the row references in the port or any internal worker buffers. Each reference consumes 4 (on 32-bit) or 8 (on 64-bit) bytes:

      • BufferingMode = Limited uses 4 * RowsPerBuffer references
      • BufferingMode = Full uses as many references as needed, in RowsPerBuffer chunks (the same usually holds for any worker internal buffers)
      Note

      To minimize the risk of the application running out of memory, do run it as a 64-bit process. The most common choice is to compile the application for Any CPU and disable the Prefer 32-bit project build property (which will run as 64-bit on a 64-bit operating system.). Alternatively, compile for x64 (which can only run as 64-bit).

      As a side benefit, running 64-bit instead of 32-bit also makes the dataflow slightly faster in many cases.

    Reducing Memory Use

    If a worker system is memory constrained:

    • Avoid running too many large-volume fully or partially blocking workers (SortTransform etc.) in parallel. Use start constraints (including between dataflow workers) and optionally group workers to control what runs in parallel.
    • Reduce row buffer sizes for non-blocking and partially blocking workers (it doesn't help fully blocking workers.) See above for details.
    • Reduce the memory consumed by each row by removing unnecessary columns (including in any source extract queries) and reduce the size (e.g. the precision) of individual columns
    • Consider offloading some memory consuming steps to e.g. a database
    • Consider breaking the processing into multiple stages, potentially staging intermediate results to temporary storage

    See Also

    • Dataflow
      • Dataflow Columns
      • Compare Dataflow Columns
      • Dataflow Column Mapping
      • InputPort.RowsPerBuffer
      • WorkerParent.BytesPerRowBuffer
      • BufferingMode
      • PortBufferingMode
      • DataflowThroughput Sample project
    In This Article
    • Configurable Members
      • Port RowsPerBuffer and WorkerParent.BytesPerRowBuffer
      • Port BufferingMode
      • Worker IsStartable
    • Blocking
      • Worker Start Constraints
      • Multi-Row Dependencies
        • Partially Blocking
        • Fully Blocking
      • Back Pressure
      • Worker Specific External Conditions
    • Buffering and Memory Consumption
    • Reducing Memory Use
    • See Also
    Back to top Copyright © 2023 Envobi Ltd