Dataflow Row Errors
Apart from the data input and output ports, dataflow workers can optionally also have one or more error output ports, i.e. ErrorOutputPort<TError>. When a worker experiences a problem with a row, e.g. a column having the wrong content that the worker can't fix, then it should send the faulty row to an error output port. This enables:
- Tracking and logging the number of error rows created per error port, per worker, and aggregated up through all ancestors to the worker system
- Logging the contents of error rows to the logging system
- Sending the error rows to a downstream worker for further processing, e.g to correct them, or to store them in a database
- Setting limits on maximum number of error rows before logging warnings and errors, and before failing the worker
Error output ports work similarly to data output ports, except that:
- They are only used for rejecting error rows, since they affect row error counts and are logged as errors (non-error rows should always use regular data output ports)
- Linking to an input port is optional. Rows sent to an unlinked error output port will still be (potentially) logged to the logging system, and maximum error count limits will still be respected.
- There are only synchronous error output methods, which are particularly easy to use. These
are most commonly used when developing custom dataflow workers, or when using dataflow workers
that take callbacks as parameters, e.g. RowsActionTransform<TInputOutputError>:
SendErrorRow(...)
overloads both logs the content of the error row, and sends the row to any downstream workerLogRowError(...)
overloads only logs the content of the error row to the logging system
- Are automatically completed successfully when the worker completes, so it's not
required to call
SendSucceeded()
orSendError()
on them.- It is however best practice to
SendSucceeded()
orSendError()
the error port if the worker will continue running after the last possible error port row has been sent.
- It is however best practice to
- Always have Full buffering.
- Note: To avoid running out of memory where a very large number of error rows might be produced, limit the number of allowed error rows (which is also the default) using MaxRowsSent and/or MaxRowsBeforeError.
Note
In row error handling code, always use SendErrorRow(...)
instead of LogRowError(...)
where
possible (since the port can be left unlinked anyway), except when a meaningful error
row is neither available, nor can be created.
Important
If the SendErrorRow(...)
or LogRowError(...)
methods return a failure status, you must
fail your worker, and use the returned ProgressStatus
as the error status and message
(calling a ToOutcomeStatus(Boolean) overload if needed).
Some examples:
- A FileHelperFileSource<TOutput> source worker that extracts a CSV file might encounter a badly formatted row, and rejects the bad CSV row (as a string, with extra information) to its error output port
- A RowActionTransform<TInputOutputError> transform worker that filters incoming rows:
- Allows the callback supplied by the library user to reject any incoming row to the error output port
- Automatically rejects any rows where the callback supplied by the library user throws an exception to the error output port
- A target worker for inserting rows into a database could reject any rows that generated a database exception to the error output port
On the other hand, a random number generator source worker would never generate a bad row, and would therefore not have any error output ports configured (and any exception would fail the worker).
Logging Only Example
Rows sent to an error output port can be automatically logged to the logging system, including showing the row contents. This is often sufficient for troubleshooting (which can obviate the need to add workers to explicitly store error rows).
This CSV source file will be loaded by the ETL application below:
0,Accelerated,4.2,Motor,Electrical
1,Nitro,13.5
2,Winter Warmer,16
3,Plus Pro,19
Only the second and third line will be inserted into the database; the other lines have errors that will be rejected:
- First line has too many columns, and will be logged and then discarded
- Last line has
ProductWeight == 0
, which the callback below specifies should be logged and discarded
By default, the row errors would have stopped the load (logging the first error), but
here we allow up to 20
row errors in the source worker, and up to 5
in the transform.
using actionETL;
using actionETL.Adb;
using actionETL.Adb.SqlClientExternal;
using actionETL.FileHelper;
using FileHelpers;
using System;
public static partial class LogErrorRows
{
[DelimitedRecord(",")]
public class Product
{
public int ProductId { get; set; }
public string ProductName { get; set; }
public double ProductWeight { get; set; }
}
private static SystemOutcomeStatus RunExample()
{
var workerSystem = new WorkerSystem("Stage Product")
.Root(ws =>
{
var provider = AdbSqlClientProvider.Get();
var source = new FileHelperFileSource<Product>(ws, "Read Product"
, @"Src/LogErrorRows/Product.csv");
source.ErrorOutput.MaxRowsBeforeError = 20;
var transform = source.Output.Link.RowActionTransform1(
"Round ProductWeight", row =>
{
row.ProductWeight = Math.Round(row.ProductWeight, 2);
// Reject and log products with missing weights:
return row.ProductWeight <= 0.0
? TransformRowTreatment.Reject : TransformRowTreatment.Send;
});
transform.ErrorOutput.MaxRowsBeforeError = 5;
transform.Output.Link.AdbInsertTarget("Insert Product"
, provider.CreateConnectionBuilder(ws.Config["SqlServer"])
, "dbo.Product");
});
return workerSystem.Start();
}
}
/* The example assumes the following table already exists:
CREATE TABLE [dbo].[Product]
(
[ProductId] INT NOT NULL PRIMARY KEY,
[ProductName] NCHAR(50) NOT NULL,
[ProductWeight] FLOAT NULL
)
*/
Both error rows in the above example will appear in detail in the logging output, greatly simplifying troubleshooting, and often removes the need to add additional workers for error row handling.
For the "Read Product" source, the first CSV line logs:
- A warning with:
- The CSV file line number:
RowNumber=0
- The CSV row contents:
Row='0,Accelerated,13.9,Motor,Electrical'
- An exception, describing the issue:
BadUsageException: Line: 1 Column: 14. Delimiter ',' found after the last field 'ProductWeight'
- The CSV file line number:
- The worker row counts including:
[ErrorOutput]=1 rows
2018-02-06 01:29:20.2538 | WARN | /Stage Product/Read Product.ErrorOutputs[ErrorOutput] | ErrorOutputPort.RowLogged | RowNumber=0. Row='0,Accelerated,13.9,Motor,Electrical' :EXCEPTION OCCURRED: FileHelpers.BadUsageException: Line: 1 Column: 14. Delimiter ',' found after the last field 'ProductWeight' (the file is wrong or you need to add a field to the record class) 2018-02-06 01:29:20.3908 | INFO | /Stage Product/Read Product | Worker.Statistics | Started=2018-02-06T01:29:20Z Completed=2018-02-06T01:29:20Z Duration=0:00:00:00.1660095 Outputs[Output]=3 rows, 18 rows/s, active 0.148s, active 20 rows/s, BufferExhaustedCount 0 ErrorOutputs[ErrorOutput]=1 rows, 6 rows/s, active 0.117s, active 8 rows/s, BufferExhaustedCount 0
For the "Round ProductWeight" transform, the last CSV line logs:
- A warning with:
- The row number:
RowNumber=2
- The row contents:
ProductId='3' ProductName='Plus Pro' ProductWeight='0'
- The row number:
- The worker row counts including:
[ErrorOutput]=1 rows
2018-02-06 01:29:20.2698 | WARN | /Stage Product/Round ProductWeight.ErrorOutputs[ErrorOutput] | ErrorOutputPort.RowLogged | RowNumber=2. ProductId='3' ProductName='Plus Pro' ProductWeight='0' 2018-02-06 01:29:20.3908 | INFO | /Stage Product/Round ProductWeight | Worker.Statistics | Started=2018-02-06T01:29:20Z Completed=2018-02-06T01:29:20Z Duration=0:00:00:00.035002 Inputs[Input]=3 rows, 85 rows/s, active 0.145s, active 20 rows/s, BufferExhaustedCount 2 Outputs[Output]=2 rows, 57 rows/s, active 0.121s, active 16 rows/s, BufferExhaustedCount 0 ErrorOutputs[ErrorOutput]=1 rows, 28 rows/s, active 0.114s, active 8 rows/s, BufferExhaustedCount 0
Note
- By default, only the first 10 error rows are logged with row contents, see MaxRowsLogged.
- Since we're not linking any error output ports, we have no use for capturing
error details, and therefore haven't added any
CaptureRowErrors
field to the Product row type.
The aggregated error row counts are included in the worker system totals with
AggregateErrorOutputRows=2
.
2018-02-06 01:29:20.4088 | INFO | /Stage Product | System.Statistics.Ending | Started=2018-02-06T01:29:20Z Completed=2018-02-06T01:29:20Z Duration=0:00:00:00.2240128 AggregateErrorOutputRows=2 AggregateOutputRows=5 AggregateWorkersCompleted=3 PeakPagedMemBytes=104632320 PeakVirtualMemBytes=18566529024 PeakWorkingSetBytes=52445184 UserProcessorTime=00:00:00.608 TotalProcessorTime=00:00:00.795
Rejecting Examples
When you want to retain any error rows for further processing, you link the error output port to a downstream worker. You can also add a CaptureRowErrors field to the row type to get error details for each error row.
Below are examples of how the row error facilities can handle a number of use cases, to meet
different requirements. Put together, the examples create the following dataflow (all arrows
send <Product>
rows, except the <StringRowErrors>
arrow; purple arrows are from error output
ports):
The individual parts of this example are described below; the source for the combined example is also available in the Dataflow Row Errors Reject Example.
Note
The specific workers used in these examples can of course be replaced with other workers, depending on requirements.
Dataflow Row with Error Details
The examples use the following row definition. Note that we've added a
public CaptureRowErrors CaptureRowErrors;
field to retain error details:
[DelimitedRecord(",")] // Needed for reading CSV file
public class Product
{
public int ProductId;
public string ProductName;
public double ProductWeight;
[FieldHidden] // Member not loaded from CSV file
public CaptureRowErrors CaptureRowErrors;
}
StringRowErrors
into Database Table
This CSV file will be loaded. The first line has too many entries:
0,Accelerated,4.2,Motor,Electrical
1,Nitro,13.5
2,Winter Warmer,16
3,Plus Pro,19
The FileHelperFileSource<TOutput> source worker reads the CSV file. It
will convert the incorrectly formatted first row to a StringRowErrors instance,
and reject it to its error output port. The database table automatically populates a DATETIME
column with the current time:
// Source worker uses a StringRowError error output type, since no original
// strongly typed row is available. If multiple workers uses StringRowErrors,
// they also could be combined and inserted into a single table:
var source = new FileHelperFileSource<Product>(ws, "Read Product"
, @"Src/RejectErrorRows/Product.csv");
// Any errors are inserted directly into a database table:
source.ErrorOutput.Link.AdbInsertTarget<StringRowErrors>(
"Insert Read Product Errors"
, acs.CreateConnectionBuilder()
, "dbo.StringRowErrors"
);
// Link source to downstream workers...
/* Assumes the following table already exist:
CREATE TABLE [dbo].[StringRowErrors]
(
[RowErrorRow] NTEXT NULL,
[RowErrorDateTime] DATETIME DEFAULT (Getdate()),
[RowErrorLocator] NTEXT NULL,
[RowErrorMessage] NTEXT NULL,
[RowErrorRowNumber] BIGINT NULL,
[RowErrorColumnName] NCHAR(128) NULL,
[RowErrorException] NTEXT NULL
)
*/
Note
More elaborate meta-data schemes are often useful, both for errors and regular data rows.
E.g. creating a unique identifier with NewGuid() for each worker system
execution, and populate a database column, e.g. on each row, with that value. This makes
it easy to filter on or join rows from one particular execution, and RowErrorLocator
allows querying on specific workers and hierarchies.
Reject Incoming Row
This transform rejects some rows from its input to its error output port, which will (later) be linked to a UnionAllTransform<TInputOutput>:
// Transform worker uses the incoming Product rows also as the error output
// rows, which will be linked to 'unionErrors' further down:
var trx1 = source.Output.Link.RowActionTransform1("Transform1"
, row => row.ProductWeight <= 10
? TransformRowTreatment.Reject : TransformRowTreatment.Send);
Reject to Multiple Workers
This section demonstrates how a transform can reject rows from its input to one or more
(in this case two additional RowActionTransform<TInputOutputError>) transforms that check
for data issues. Corrected rows are sent to data output ports and will (later) be combined
with other data rows, while uncorrectable error rows goes to error ports and will (later)
be combined with other strongly typed Product
error rows.
// Transform worker uses the incoming Product rows also as the error output
// rows:
var trx2 = trx1.Output.Link.RowActionTransform1("Transform2"
, row => row.ProductWeight <= 20
? TransformRowTreatment.Reject : TransformRowTreatment.Send);
// Some errors are corrected, and sent to the output port. Uncorrectable
// rows are sent to the error output port, in this case for further
// checks:
var trx2CheckA = trx2.ErrorOutput.Link.RowActionTransform1("Transform2 CheckA"
, row =>
{
if (row.ProductWeight <= 15)
{
row.ProductWeight = 30;
row.CaptureRowErrors.RowErrors?.Clear(); // Optional
return TransformRowTreatment.Send;
}
return TransformRowTreatment.Reject;
});
// Additional errors are corrected, and sent to the output port. Uncorrectable
// rows are sent to the error output port, which will be linked to
// 'unionErrors' further down:
var trx2CheckB = trx2CheckA.ErrorOutput.Link.RowActionTransform1("Transform2 CheckB"
, row =>
{
if (row.ProductWeight <= 18)
{
row.ProductWeight = 35;
return TransformRowTreatment.Send;
}
return TransformRowTreatment.Reject;
});
trx2CheckB.Output.ClearRowErrors = true; // Optional
trx2CheckA
and trx2CheckB
uses two different ways to clear the error details from the
corrected rows. Clearing them is useful if there are other workers that might also create
error rows, which could otherwise lead to corrected errors being rechecked, or stored to a
row error repository.
Insert Data and Error Rows
All Product
data (including corrected) rows are combined and inserted into one table,
and all Product
error rows are combined and inserted into a second table:
// Combine and insert data and corrected error rows:
CreateUnionAndInsert("Product Data", "dbo.Product"
, trx2.Output, trx2CheckA.Output, trx2CheckB.Output);
// Combine and insert error rows:
CreateUnionAndInsert("Product Errors", "dbo.ProductError"
, trx1.ErrorOutput, trx2CheckB.ErrorOutput);
// Local helper function to union and insert rows into a database table
void CreateUnionAndInsert(string name, string tableName,
params OutputPortBase<Product>[] inputsFrom)
{
if (inputsFrom.Length == 0)
return;
inputsFrom[0].Worker.Parent.GetDownstreamFactory<Product>()
.UnionAllTransform("Union " + name, inputsFrom)
.Output.Link
.AdbInsertTarget("Insert " + name, acs.CreateConnectionBuilder(), tableName);
}