Search Results for

    Show / Hide Table of Contents

    Dataflow Row Errors

    Apart from the data input and output ports, dataflow workers can optionally also have one or more error output ports, i.e. ErrorOutputPort<TError>. When a worker experiences a problem with a row, e.g. a column having the wrong content that the worker can't fix, then it should send the faulty row to an error output port. This enables:

    • Tracking and logging the number of error rows created per error port, per worker, and aggregated up through all ancestors to the worker system
    • Logging the contents of error rows to the logging system
    • Sending the error rows to a downstream worker for further processing, e.g to correct them, or to store them in a database
    • Setting limits on maximum number of error rows before logging warnings and errors, and before failing the worker

    Error output ports work similarly to data output ports, except that:

    • They are only used for rejecting error rows, since they affect row error counts and are logged as errors (non-error rows should always use regular data output ports)
    • Linking to an input port is optional. Rows sent to an unlinked error output port will still be (potentially) logged to the logging system, and maximum error count limits will still be respected.
    • There are only synchronous error output methods, which are particularly easy to use. These are most commonly used when developing custom dataflow workers, or when using dataflow workers that take callbacks as parameters, e.g. RowsActionTransform<TInputOutputError>:
      • SendErrorRow(...) overloads both logs the content of the error row, and sends the row to any downstream worker
      • LogRowError(...) overloads only logs the content of the error row to the logging system
    • Are automatically completed successfully when the worker completes, so it's not required to call SendSucceeded() or SendError() on them.
      • It is however best practice to SendSucceeded() or SendError() the error port if the worker will continue running after the last possible error port row has been sent.
    • Always have Full buffering.
      • Note: To avoid running out of memory where a very large number of error rows might be produced, limit the number of allowed error rows (which is also the default) using MaxRowsSent and/or MaxRowsBeforeError.
    Note

    In row error handling code, always use SendErrorRow(...) instead of LogRowError(...) where possible (since the port can be left unlinked anyway), except when a meaningful error row is neither available, nor can be created.

    Important

    If the SendErrorRow(...) or LogRowError(...) methods return a failure status, you must fail your worker, and use the returned ProgressStatus as the error status and message (calling a ToOutcomeStatus(Boolean) overload if needed).

    Some examples:

    • A FileHelperFileSource<TOutput> source worker that extracts a CSV file might encounter a badly formatted row, and rejects the bad CSV row (as a string, with extra information) to its error output port
    • A RowActionTransform<TInputOutputError> transform worker that filters incoming rows:
      • Allows the callback supplied by the library user to reject any incoming row to the error output port
      • Automatically rejects any rows where the callback supplied by the library user throws an exception to the error output port
    • A target worker for inserting rows into a database could reject any rows that generated a database exception to the error output port

    On the other hand, a random number generator source worker would never generate a bad row, and would therefore not have any error output ports configured (and any exception would fail the worker).

    Logging Only Example

    Rows sent to an error output port can be automatically logged to the logging system, including showing the row contents. This is often sufficient for troubleshooting (which can obviate the need to add workers to explicitly store error rows).

    This CSV source file will be loaded by the ETL application below:

    0,Accelerated,4.2,Motor,Electrical
    1,Nitro,13.5
    2,Winter Warmer,16
    3,Plus Pro,19
    

    Only the second and third line will be inserted into the database; the other lines have errors that will be rejected:

    • First line has too many columns, and will be logged and then discarded
    • Last line has ProductWeight == 0, which the callback below specifies should be logged and discarded

    By default, the row errors would have stopped the load (logging the first error), but here we allow up to 20 row errors in the source worker, and up to 5 in the transform.

    using actionETL;
    using actionETL.Adb;
    using actionETL.Adb.SqlClientExternal;
    using actionETL.FileHelper;
    using FileHelpers;
    using System;
    
    public static partial class LogErrorRows
    {
        [DelimitedRecord(",")]
        public class Product
        {
            public int ProductId { get; set; }
            public string ProductName { get; set; }
            public double ProductWeight { get; set; }
        }
    
        private static SystemOutcomeStatus RunExample()
        {
            var workerSystem = new WorkerSystem("Stage Product")
                .Root(ws =>
                {
                    var provider = AdbSqlClientProvider.Get();
    
                    var source = new FileHelperFileSource<Product>(ws, "Read Product"
                        , @"Src/LogErrorRows/Product.csv");
                    source.ErrorOutput.MaxRowsBeforeError = 20;
    
                    var transform = source.Output.Link.RowActionTransform1(
                          "Round ProductWeight", row =>
                        {
                            row.ProductWeight = Math.Round(row.ProductWeight, 2);
                            // Reject and log products with missing weights:
                            return row.ProductWeight <= 0.0
                                ? TransformRowTreatment.Reject : TransformRowTreatment.Send;
                        });
                    transform.ErrorOutput.MaxRowsBeforeError = 5;
    
                    transform.Output.Link.AdbInsertTarget("Insert Product"
                        , provider.CreateConnectionBuilder(ws.Config["SqlServer"])
                        , "dbo.Product");
                });
            return workerSystem.Start();
        }
    }
    
    /* The example assumes the following table already exists:
        CREATE TABLE [dbo].[Product]
        (
            [ProductId] INT NOT NULL PRIMARY KEY,
            [ProductName] NCHAR(50) NOT NULL,
            [ProductWeight] FLOAT NULL
        )
     */
    

    Both error rows in the above example will appear in detail in the logging output, greatly simplifying troubleshooting, and often removes the need to add additional workers for error row handling.

    For the "Read Product" source, the first CSV line logs:

    • A warning with:
      • The CSV file line number: RowNumber=0
      • The CSV row contents: Row='0,Accelerated,13.9,Motor,Electrical'
      • An exception, describing the issue: BadUsageException: Line: 1 Column: 14. Delimiter ',' found after the last field 'ProductWeight'
    • The worker row counts including: [ErrorOutput]=1 rows
    2018-02-06 01:29:20.2538 | WARN | /Stage Product/Read Product.ErrorOutputs[ErrorOutput] | ErrorOutputPort.RowLogged | RowNumber=0. Row='0,Accelerated,13.9,Motor,Electrical' :EXCEPTION OCCURRED: FileHelpers.BadUsageException: Line: 1 Column: 14. Delimiter ',' found after the last field 'ProductWeight' (the file is wrong or you need to add a field to the record class)
    
    2018-02-06 01:29:20.3908 | INFO | /Stage Product/Read Product | Worker.Statistics | Started=2018-02-06T01:29:20Z Completed=2018-02-06T01:29:20Z Duration=0:00:00:00.1660095 Outputs[Output]=3 rows, 18 rows/s, active 0.148s, active 20 rows/s, BufferExhaustedCount 0 ErrorOutputs[ErrorOutput]=1 rows, 6 rows/s, active 0.117s, active 8 rows/s, BufferExhaustedCount 0
    

    For the "Round ProductWeight" transform, the last CSV line logs:

    • A warning with:
      • The row number: RowNumber=2
      • The row contents: ProductId='3' ProductName='Plus Pro' ProductWeight='0'
    • The worker row counts including: [ErrorOutput]=1 rows
    2018-02-06 01:29:20.2698 | WARN | /Stage Product/Round ProductWeight.ErrorOutputs[ErrorOutput] | ErrorOutputPort.RowLogged | RowNumber=2. ProductId='3' ProductName='Plus Pro' ProductWeight='0' 
    
    2018-02-06 01:29:20.3908 | INFO | /Stage Product/Round ProductWeight | Worker.Statistics | Started=2018-02-06T01:29:20Z Completed=2018-02-06T01:29:20Z Duration=0:00:00:00.035002 Inputs[Input]=3 rows, 85 rows/s, active 0.145s, active 20 rows/s, BufferExhaustedCount 2 Outputs[Output]=2 rows, 57 rows/s, active 0.121s, active 16 rows/s, BufferExhaustedCount 0 ErrorOutputs[ErrorOutput]=1 rows, 28 rows/s, active 0.114s, active 8 rows/s, BufferExhaustedCount 0
    
    Note
    • By default, only the first 10 error rows are logged with row contents, see MaxRowsLogged.
    • Since we're not linking any error output ports, we have no use for capturing error details, and therefore haven't added any CaptureRowErrors field to the Product row type.

    The aggregated error row counts are included in the worker system totals with AggregateErrorOutputRows=2.

    2018-02-06 01:29:20.4088 | INFO | /Stage Product | System.Statistics.Ending | Started=2018-02-06T01:29:20Z Completed=2018-02-06T01:29:20Z Duration=0:00:00:00.2240128 AggregateErrorOutputRows=2 AggregateOutputRows=5 AggregateWorkersCompleted=3 PeakPagedMemBytes=104632320 PeakVirtualMemBytes=18566529024 PeakWorkingSetBytes=52445184 UserProcessorTime=00:00:00.608 TotalProcessorTime=00:00:00.795
    

    Rejecting Examples

    When you want to retain any error rows for further processing, you link the error output port to a downstream worker. You can also add a CaptureRowErrors field to the row type to get error details for each error row.

    Below are examples of how the row error facilities can handle a number of use cases, to meet different requirements. Put together, the examples create the following dataflow (all arrows send <Product> rows, except the <StringRowErrors> arrow; purple arrows are from error output ports):

    Reject Dataflow Example

    The individual parts of this example are described below; the source for the combined example is also available in the Dataflow Row Errors Reject Example.

    Note

    The specific workers used in these examples can of course be replaced with other workers, depending on requirements.

    Dataflow Row with Error Details

    The examples use the following row definition. Note that we've added a public CaptureRowErrors CaptureRowErrors; field to retain error details:

    [DelimitedRecord(",")] // Needed for reading CSV file
    public class Product
    {
        public int ProductId;
        public string ProductName;
        public double ProductWeight;
    
        [FieldHidden] // Member not loaded from CSV file
        public CaptureRowErrors CaptureRowErrors;
    }
    

    StringRowErrors into Database Table

    This CSV file will be loaded. The first line has too many entries:

    0,Accelerated,4.2,Motor,Electrical
    1,Nitro,13.5
    2,Winter Warmer,16
    3,Plus Pro,19
    

    The FileHelperFileSource<TOutput> source worker reads the CSV file. It will convert the incorrectly formatted first row to a StringRowErrors instance, and reject it to its error output port. The database table automatically populates a DATETIME column with the current time:

    // Source worker uses a StringRowError error output type, since no original
    // strongly typed row is available. If multiple workers uses StringRowErrors,
    // they also could be combined and inserted into a single table:
    var source = new FileHelperFileSource<Product>(ws, "Read Product"
        , @"Src/RejectErrorRows/Product.csv");
    
    // Any errors are inserted directly into a database table:
    source.ErrorOutput.Link.AdbInsertTarget<StringRowErrors>(
          "Insert Read Product Errors"
        , acs.CreateConnectionBuilder()
        , "dbo.StringRowErrors"
        );
    
    // Link source to downstream workers...
    
    /* Assumes the following table already exist:
    CREATE TABLE [dbo].[StringRowErrors]
    (
        [RowErrorRow] NTEXT NULL,
        [RowErrorDateTime] DATETIME DEFAULT (Getdate()),
    
        [RowErrorLocator] NTEXT NULL,
        [RowErrorMessage] NTEXT NULL,
        [RowErrorRowNumber] BIGINT NULL,
        [RowErrorColumnName] NCHAR(128) NULL,
        [RowErrorException] NTEXT NULL
    )
    */
    
    Note

    More elaborate meta-data schemes are often useful, both for errors and regular data rows. E.g. creating a unique identifier with NewGuid() for each worker system execution, and populate a database column, e.g. on each row, with that value. This makes it easy to filter on or join rows from one particular execution, and RowErrorLocator allows querying on specific workers and hierarchies.

    Reject Incoming Row

    This transform rejects some rows from its input to its error output port, which will (later) be linked to a UnionAllTransform<TInputOutput>:

    // Transform worker uses the incoming Product rows also as the error output
    // rows, which will be linked to 'unionErrors' further down:
    var trx1 = source.Output.Link.RowActionTransform1("Transform1"
        , row => row.ProductWeight <= 10
            ? TransformRowTreatment.Reject : TransformRowTreatment.Send);
    

    Reject to Multiple Workers

    This section demonstrates how a transform can reject rows from its input to one or more (in this case two additional RowActionTransform<TInputOutputError>) transforms that check for data issues. Corrected rows are sent to data output ports and will (later) be combined with other data rows, while uncorrectable error rows goes to error ports and will (later) be combined with other strongly typed Product error rows.

    // Transform worker uses the incoming Product rows also as the error output
    // rows:
    var trx2 = trx1.Output.Link.RowActionTransform1("Transform2"
        , row => row.ProductWeight <= 20
            ? TransformRowTreatment.Reject : TransformRowTreatment.Send);
    
    // Some errors are corrected, and sent to the output port. Uncorrectable
    // rows are sent to the error output port, in this case for further
    // checks:
    var trx2CheckA = trx2.ErrorOutput.Link.RowActionTransform1("Transform2 CheckA"
        , row =>
        {
            if (row.ProductWeight <= 15)
            {
                row.ProductWeight = 30;
                row.CaptureRowErrors.RowErrors?.Clear(); // Optional
                return TransformRowTreatment.Send;
            }
            return TransformRowTreatment.Reject;
        });
    
    // Additional errors are corrected, and sent to the output port. Uncorrectable
    // rows are sent to the error output port, which will be linked to
    // 'unionErrors' further down:
    var trx2CheckB = trx2CheckA.ErrorOutput.Link.RowActionTransform1("Transform2 CheckB"
        , row =>
        {
            if (row.ProductWeight <= 18)
            {
                row.ProductWeight = 35;
                return TransformRowTreatment.Send;
            }
            return TransformRowTreatment.Reject;
        });
    trx2CheckB.Output.ClearRowErrors = true; // Optional
    

    trx2CheckA and trx2CheckB uses two different ways to clear the error details from the corrected rows. Clearing them is useful if there are other workers that might also create error rows, which could otherwise lead to corrected errors being rechecked, or stored to a row error repository.

    Insert Data and Error Rows

    All Product data (including corrected) rows are combined and inserted into one table, and all Product error rows are combined and inserted into a second table:

    // Combine and insert data and corrected error rows:
    CreateUnionAndInsert("Product Data", "dbo.Product"
        , trx2.Output, trx2CheckA.Output, trx2CheckB.Output);
    
    // Combine and insert error rows:
    CreateUnionAndInsert("Product Errors", "dbo.ProductError"
        , trx1.ErrorOutput, trx2CheckB.ErrorOutput);
    
    
    // Local helper function to union and insert rows into a database table
    void CreateUnionAndInsert(string name, string tableName,
        params OutputPortBase<Product>[] inputsFrom)
    {
        if (inputsFrom.Length == 0)
            return;
    
        inputsFrom[0].Worker.Parent.GetDownstreamFactory<Product>()
        .UnionAllTransform("Union " + name, inputsFrom)
        
        .Output.Link
        .AdbInsertTarget("Insert " + name, acs.CreateConnectionBuilder(), tableName);
    }
    

    See Also

    • Dataflow
      • Dataflow Rows
    • Troubleshooting
    In This Article
    Back to top Copyright © 2020 Envobi Ltd