Dataflow Blocking and Row Buffering
This article describes dataflow row:
- Blocking - The delaying or stopping of:
- Rows being taken from or sent to a worker port, or
- Worker results being created
- Buffering - Ports and workers can process and transfer rows either one by one, or multiple rows at a time. To support this, ports use row buffers, which improves overall performance by smoothing out differences in speed between the upstream and downstream workers, and minimizes the overhead of moving rows between workers. Increasing buffering too much can however exhaust available memory, leading to poor performance.
This information is useful for understanding and troubleshooting dataflow memory use, and any delays in row processing.
Note
Also see the
Dataflow
Configurable Members
Port RowsPerBuffer and WorkerParent.BytesPerRowBuffer
Each pair of linked (upstream output and downstream input) ports have a specific
number of Rows
Note
The RowsPerBuffer setting still affects port buffering and transfer speed
when taking or sending rows asynchronously or row by row, but is in these
cases otherwise less important.
By default, RowsPerBuffer is automatically calculated by dividing
Worker
Note
The actual
InputPort.RowsPerBufferproperty value is logged with theSystem.Status.Runningcategory, and also with anyWorker.Status.Runningcategory if the value is different from its parent worker or worker system:... | System.Status.Running | BytesPerRowBuffer=65536
... | Worker.Status.Running | BytesPerRowBuffer=400000
The actual
RowsPerBuffervalue is logged for each port when the worker runs, here 1024 rows:... | Worker.Status.Running | Inputs[Input]=Outputs[/MyWorker.Outputs[Output]]:1024rows
If the resulting default value of RowsPerBuffer for a port is e.g. too large and
consuming too much memory, or too small and providing too little buffering, you can:
Change Worker
Parent. from its defaultBytes Per Row Buffer 65536value. This will affect all ports of any descendant workers, and if this is a worker any input ports it might have.Note: This is a good way to increase or decrease row buffer size and memory use across multiple workers.
Explicitly set Input
Port. , which overrides the automatically calculated value.Rows Per Buffer Note: This is a good way to increase or decrease row buffer size and memory use of individual input ports.
Increasing RowsPerBuffer can improve performance, but also consumes more
memory. Setting RowsPerBuffer too large should be avoided, since it can
cause the application to swap large amounts of memory out to disk, leading to
drastically reduced performance.
The above properties can be set directly as well as with configurations
(Input
InputPort.RowsPerBufferconfiguration withapplyTospecified (i.e. a single port)InputPort.RowsPerBufferpropertyWorkerParent.BytesPerRowBufferconfiguration withapplyTospecified (i.e. a specific worker or worker system and its descendants)WorkerParent.BytesPerRowBufferpropertyWorkerParent.BytesPerRowBufferinherited property value from parent worker or worker systemInputPort.RowsPerBufferconfiguration withoutapplyTospecified (i.e. all workers and the worker system)WorkerParent.BytesPerRowBufferconfiguration withoutapplyTospecified (i.e. all workers and the worker system)WorkerParent.BytesPerRowBufferdefault value in the worker system:65536bytes
Also see Buffering and Memory Consumption below.
Port BufferingMode
Each pair of linked (upstream output and downstream input) port has a specific
BufferingLimited number of rows (the most common case,
currently four times RowsPerBuffer), or if it has Full (i.e. unlimited)
buffering. See Port
Worker IsStartable
The user of the worker can delay the start of the worker by using
Is
Blocking
Most dataflow workers and input ports are non-blocking, i.e. while they might process multiple rows at a time if multiple rows are available, they:
- Never require accessing multiple input rows before consuming them
- Consume input rows quickly, without rows ever getting stuck in the port or worker buffers waiting for other input rows or external events
Examples include CollectionTarget<TInput> and UnionAllTransform<TInputOutput>. Note that despite having multiple input ports, the latter is implemented to be non-blocking, which is possible since the ports are independent.
There are however workers and scenarios that can delay or stop rows from being taken or sent, or results from being created, as described below.
Worker Start Constraints
By default, dataflow source workers are always startable, while transforms and
targets are not started until at least one input has gone beyond the Ready
state (i.e. an upstream worker sends data or port completion).
Important
To avoid dataflow deadlocks, all dataflow source workers that are linked to each other via ports (normally indirectly via transforms and targets) will be started as a group, but only when all of their (default or custom) start constraints are startable.
Note
An upstream worker can always send some initial rows until the port buffer
fills up, even if the downstream worker is not allowed to start. If the
BufferingMode is Full, upstream can actually send all rows without
the downstream worker starting. This will obviously increase memory use.
If the library user sets a custom start constraint on a dataflow worker, this can of course block row processing.
Important
Setting start constraints between transform and/or target workers that share a common upstream worker could lead to deadlock. To avoid this, ensure that:
- Either the worker with the start constraint or one of its upstream workers before the shared common worker already uses full buffering, or
- Set the input port of the worker with the start constraint to use
Port
Buffering . See e.g. theMode. Full AdbTransactionActionWorkerExample.
Also see Start Constraints in Worker Execution.
Multi-Row Dependencies
Some workers must access more than one row at a time (from the same or a different input port) to continue, which can delay row processing.
If row buffering is required, they will either use a BufferingMode of Full,
or implement it internally in the worker, either of which will increase
memory use.
These workers can be categorized as partially or fully blocking:
Partially Blocking
- How long processing is blocked (and potentially how many rows they must buffer) depends on the incoming data (or the worker configuration)
- The worker can (but is not required to) produce output rows before seeing all input rows
Examples include:
- Multi-input workers where the inputs are dependent,
e.g. MergeSortedTransform<TInputOutput> and the various
*JoinMergeSortedTransformworkers (see InnerJoinMergeSortedTransform<TLeftInput, TRightInput, TOutput> for how to minimize their memory usage) - Single-input workers with multi-row dependencies, e.g. an
AggregateTransformworker with a custom aggregation that outputs the largest of each set of 100 incoming rows.
Fully Blocking
A few workers and input ports are fully blocking, where all incoming rows must be received before processing can continue. Examples include:
- SortTransform<TInputOutput> (which must buffer all rows, with associated memory use)
AggregateTransformworkers (which do not require buffering rows), such as AggregateTransform<TInputAccumulateOutput>
Back Pressure
If a downstream worker consumes its input rows slower than the upstream worker
produces them (or it's blocked from reading its input rows), and the
BufferingMode is Limited, then the port buffer will fill up and the upstream
worker will be prevented from sending more rows until the buffer clears. This is
also why workers must never send more rows to a data output port than there is
row demand (or an error will be raised). Also see
Row Flow Control.
This process is called back pressure, and is part of the row flow control that
reduces overall memory use. Do note however that ports that have full buffering,
such as the SortTransform input buffering, are not affected by back pressure.
Worker Specific External Conditions
Any worker can implement additional conditions that might delay processing. The most common example is source workers that access external data sources, e.g. a database, where retrieving the data can be delayed.
Other examples include waiting for a trigger file, or for a timer to expire.
Buffering and Memory Consumption
A few dataflow workers require memory for caches or for storing temporary
results, e.g. Dictionary
For most dataflow workers however, their port (and any internal row) buffers constitutes their only significant memory usage. Specifically (although subject to change):
- Each actual row in a port or worker internal buffer consumes the memory needed for the row class instance, see e.g. https://codeblog.jonskeet.uk/2011/04/05/of-memory-and-strings for a detailed description
Additional memory is required to hold the row references in the port or any internal worker buffers. Each reference consumes
4(on 32-bit) or8(on 64-bit) bytes:BufferingMode=Limiteduses4 * RowsPerBufferreferencesBufferingMode=Fulluses as many references as needed, inRowsPerBufferchunks (the same usually holds for any worker internal buffers)
Note
To minimize the risk of the application running out of memory, do run it as a 64-bit process. The most common choice is to compile the application for
Any CPUand disable thePrefer 32-bitproject build property (which will run as 64-bit on a 64-bit operating system.). Alternatively, compile forx64(which can only run as 64-bit).As a side benefit, running 64-bit instead of 32-bit also makes the dataflow slightly faster in many cases.
Reducing Memory Use
If a worker system is memory constrained:
- Avoid running too many large-volume fully or partially blocking workers (
SortTransformetc.) in parallel. Use start constraints (including between dataflow workers) and optionally group workers to control what runs in parallel. - Reduce row buffer sizes for non-blocking and partially blocking workers (it doesn't help fully blocking workers.) See above for details.
- Reduce the memory consumed by each row by removing unnecessary columns (including in any source extract queries) and reduce the size (e.g. the precision) of individual columns
- Consider offloading some memory consuming steps to e.g. a database
- Consider breaking the processing into multiple stages, potentially staging intermediate results to temporary storage