Dataflow Blocking and Row Buffering
This article describes dataflow row:
- Blocking - The delaying or stopping of rows being taken or sent, or results being created
- Buffering - Ports and workers can process and transfer rows either one by one, or multiple rows at a time. To support this, ports use row buffers, which improves overall performance by smoothing out differences in speed between the upstream and downstream workers, and minimizes the overhead of moving rows between workers. Increasing buffering too much can however exhaust available memory, leading to poor performance.
This information is useful for understanding and troubleshooting dataflow memory use, and any delays in row processing.
Also see the DataflowThroughput Sample project, which generates and benchmarks a large amount of dataflow traffic.
Each pair of linked (upstream output and downstream input) ports have a specific BufferCapacity, which is the recommended and maximum number of rows when sending or taking rows synchronously in batches, to or from the ports (instead of either asynchronously or row by row).
BufferCapacity can improve performance, but also consumes more
BufferCapacity too large should be avoided, since it can
cause the application to swap large amounts of memory out to disk, leading to
drastically reduced performance.
The default value for
BufferCapacity for how to change this value.
Each pair of linked (upstream output and downstream input) port has a specific
BufferingMode, which controls
whether it only buffers up to a
Limited number of rows (the most common case,
currently four times
BufferCapacity), or if it has
Full (i.e. unlimited)
buffering. See PortBufferingMode for further details.
The user of the worker can delay the start of the worker by using IsStartable, which would also delay any row processing.
Most dataflow workers and input ports are non-blocking, i.e. while they might process multiple rows at a time if multiple rows are available, they:
- Never require accessing multiple input rows before consuming them
- Consume input rows quickly, without rows ever getting stuck in the port or worker buffers waiting for other input rows or external events
Examples include CollectionTarget<TInput> and UnionAllTransform<TInputOutput>. Note that despite having multiple input ports, the latter is implemented to be non-blocking, which is possible since the ports are independent.
There are however workers and scenarios that can delay or stop rows from being taken or sent, or results from being created, as described below.
Worker Start Constraints
By default, dataflow source workers are always startable, while transforms and
targets are not started until at least one input has gone beyond the
state (i.e. an upstream worker sends data or port completion).
To avoid dataflow deadlocks, all dataflow source workers that are linked to each other via ports (normally indirectly via transforms and targets) will be started as a group, but only when all of their (default or custom) start constraints are startable.
An upstream worker can always send some initial rows until the port buffer
fills up, even if the downstream worker is not allowed to start. If the
Full, upstream can actually send all rows without
the downstream worker starting. This will obviously increase memory use.
If the library user sets a custom start constraint on a dataflow worker, this can of course block row processing.
Setting start constraints between transform and/or target workers that share a common upstream worker could lead to deadlock. To avoid this, ensure that:
Also see Start Constraints in Worker Execution.
Some workers must access more than one row at a time (from the same or a different input port) to continue, which can delay row processing.
If row buffering is required, they will either use a
or implement it internally in the worker, either of which will increase
These workers can be categorized as partially or fully blocking:
- How long processing is blocked (and potentially how many rows they must buffer) depends on the incoming data (or the worker configuration)
- The worker can (but is not required to) produce output rows before seeing all input rows
- Multi-input workers where the inputs are dependent,
e.g. MergeSortedTransform<TInputOutput> and the various
*JoinMergeSortedTransformworkers (see InnerJoinMergeSortedTransform<TLeftInput, TRightInput, TOutput> for how to minimize their memory usage)
- Single-input workers with multi-row dependencies, e.g. an
AggregateTransformworker with a custom aggregation that outputs the largest of each set of 100 incoming rows.
A few workers and input ports are fully blocking, where all incoming rows must be received before processing can continue. Examples include:
- SortTransform<TInputOutput> (which must buffer all rows, with associated memory use)
AggregateTransformworkers (which do not require buffering rows), such as AggregateTransform<TInputAccumulateOutput>
If a downstream worker consumes its input rows slower than the upstream worker
produces them (or it's blocked from reading its input rows), and the
Limited, then the port buffer will fill up and the upstream
worker will be prevented from sending more rows until the buffer clears. This is
also why workers must never send more rows to a data output port than there is
row demand (or an error will be raised). Also see
Row Flow Control.
This process is called back pressure, and is part of the row flow control that
reduces overall memory use. Do note however that ports that have full buffering,
such as the
SortTransform input buffering, are not affected by back pressure.
Worker Specific External Conditions
Any worker can implement additional conditions that might delay processing. The most common example is source workers that access external data sources, e.g. a database, where retrieving the data can be delayed.
Other examples include waiting for a trigger file, or for a timer to expire.
Buffering and Memory Consumption
For most however, their port (and any internal row) buffers constitutes their only significant memory usage. Specifically (although subject to change):
- Each actual row in a port or worker internal buffer consumes the memory needed for the row class instance, see e.g. https://codeblog.jonskeet.uk/2011/04/05/of-memory-and-strings for a detailed description
Additional memory is required to hold the row references in the port or any internal worker buffers. Each reference consumes
4(on 32-bit) or
8(on 64-bit) bytes:
4 * BufferCapacityreferences
Fulluses as many references as needed, in
BufferCapacitychunks (the same usually holds for any worker internal buffers)
To minimize the risk of the application running out of memory, do run it as a 64-bit process. The most common choice is to compile the application for
Any CPUand disable the
Prefer 32-bitproject build property (which will run as 64-bit on a 64-bit operating system.). Alternatively, compile for
x64(which can only run as 64-bit).
As a side benefit, running 64-bit instead of 32-bit also makes the dataflow slightly faster in many cases.
Reducing Memory Use
If a worker system is memory constrained:
- Avoid running too many large-volume buffering workers (
SortTransformetc.) in parallel. Use start constraints (including between dataflow workers) and optionally group workers to control what runs in parallel.
- If individual rows consume a large amount of memory, reduce
BufferCapacityfor some or all dataflow workers
- Reduce the memory consumed by each row by removing unnecessary columns (including in any source extract queries) and reduce the size (e.g. the precision) of individual columns
- Consider offloading some memory consuming steps to e.g. a database
- Consider breaking the processing into multiple stages, potentially staging intermediate results to permanent storage