Search Results for

    Show / Hide Table of Contents

    Dataflow Row Errors Reject Example

    This is the combined example that is discussed in Rejecting Examples:

        using actionETL;
        using actionETL.Adb;
        using actionETL.Adb.SqlClientExternal;
        using actionETL.FileHelper;
        using FileHelpers;
        using System;
    
        public static partial class RejectErrorRows
        {
            [DelimitedRecord(",")] // Needed for reading CSV file
            public class Product
            {
                public int ProductId;
                public string ProductName;
                public double ProductWeight;
    
                [FieldHidden] // Member not loaded from CSV file
                public CaptureRowErrors CaptureRowErrors;
            }
    #pragma warning restore CA1051 // Do not declare visible instance fields
    
            private static SystemOutcomeStatus RunExample()
            {
                var workerSystem = new WorkerSystem("Stage Product")
                    .Root(ws =>
                    {
                        var acs = AdbSqlClientProvider.Get()
                            .CreateConnectionString(ws.Config["SqlServer"]);
    
                        // Source worker uses a StringRowError error output type, since no original
                        // strongly typed row is available. If multiple workers uses StringRowErrors,
                        // they also could be combined and inserted into a single table:
                        var source = new FileHelperFileSource<Product>(ws, "Read Product"
                            , @"Src/RejectErrorRows/Product.csv");
                        
                        // Any errors are inserted directly into a database table:
                        source.ErrorOutput.Link.AdbInsertTarget<StringRowErrors>(
                              "Insert Read Product Errors"
                            , acs.CreateConnectionBuilder()
                            , "dbo.StringRowErrors"
                            );
    
                        // Link source to downstream workers...
    
                        /* Assumes the following table already exist:
                        CREATE TABLE [dbo].[StringRowErrors]
                        (
                            [RowErrorRow] NTEXT NULL,
                            [RowErrorDateTime] DATETIME DEFAULT (Getdate()),
    
                            [RowErrorLocator] NTEXT NULL,
                            [RowErrorMessage] NTEXT NULL,
                            [RowErrorRowNumber] BIGINT NULL,
                            [RowErrorColumnName] NCHAR(128) NULL,
                            [RowErrorException] NTEXT NULL
                        )
                        */
    
    
                        // Transform worker uses the incoming Product rows also as the error output
                        // rows, which will be linked to 'unionErrors' further down:
                        var trx1 = source.Output.Link.RowActionTransform1("Transform1"
                            , row => row.ProductWeight <= 10
                                ? TransformRowTreatment.Reject : TransformRowTreatment.Send);
    
    
                        // Transform worker uses the incoming Product rows also as the error output
                        // rows:
                        var trx2 = trx1.Output.Link.RowActionTransform1("Transform2"
                            , row => row.ProductWeight <= 20
                                ? TransformRowTreatment.Reject : TransformRowTreatment.Send);
                    
                        // Some errors are corrected, and sent to the output port. Uncorrectable
                        // rows are sent to the error output port, in this case for further
                        // checks:
                        var trx2CheckA = trx2.ErrorOutput.Link.RowActionTransform1("Transform2 CheckA"
                            , row =>
                            {
                                if (row.ProductWeight <= 15)
                                {
                                    row.ProductWeight = 30;
                                    row.CaptureRowErrors.RowErrors?.Clear(); // Optional
                                    return TransformRowTreatment.Send;
                                }
                                return TransformRowTreatment.Reject;
                            });
                  
                        // Additional errors are corrected, and sent to the output port. Uncorrectable
                        // rows are sent to the error output port, which will be linked to
                        // 'unionErrors' further down:
                        var trx2CheckB = trx2CheckA.ErrorOutput.Link.RowActionTransform1("Transform2 CheckB"
                            , row =>
                            {
                                if (row.ProductWeight <= 18)
                                {
                                    row.ProductWeight = 35;
                                    return TransformRowTreatment.Send;
                                }
                                return TransformRowTreatment.Reject;
                            });
                        trx2CheckB.Output.ClearRowErrors = true; // Optional
    
    
                        // Combine and insert data and corrected error rows:
                        CreateUnionAndInsert("Product Data", "dbo.Product"
                            , trx2.Output, trx2CheckA.Output, trx2CheckB.Output);
    
                        // Combine and insert error rows:
                        CreateUnionAndInsert("Product Errors", "dbo.ProductError"
                            , trx1.ErrorOutput, trx2CheckB.ErrorOutput);
    
    
                        // Local helper function to union and insert rows into a database table
                        void CreateUnionAndInsert(string name, string tableName,
                            params OutputPortBase<Product>[] inputsFrom)
                        {
                            if (inputsFrom.Length == 0)
                                return;
    
                            inputsFrom[0].Worker.Parent.GetDownstreamFactory<Product>()
                            .UnionAllTransform("Union " + name, inputsFrom)
                            
                            .Output.Link
                            .AdbInsertTarget("Insert " + name, acs.CreateConnectionBuilder(), tableName);
                        }
    
                        /* Assumes the following tables already exist:
                        CREATE TABLE [dbo].[Product]
                        (
                            [ProductId] INT NOT NULL PRIMARY KEY,
                            [ProductName] NCHAR(50) NOT NULL,
                            [ProductWeight] FLOAT NULL
                        )
    
                        CREATE TABLE [dbo].[ProductError]
                        (
                            [ProductId] INT NOT NULL PRIMARY KEY,
                            [ProductName] NCHAR(50) NOT NULL,
                            [ProductWeight] FLOAT NULL,
    
                            [RowErrorDateTime] DATETIME DEFAULT (Getdate()),
    
                            [RowErrorLocator] NTEXT NULL,
                            [RowErrorMessage] NTEXT NULL,
                            [RowErrorRowNumber] BIGINT NULL,
                            [RowErrorColumnName] NCHAR(128) NULL,
                            [RowErrorExceptionString] NTEXT NULL
                        )
                        */
                    });
                return workerSystem.Start();
            }
        }
    
    In This Article
    Back to top Copyright © 2023 Envobi Ltd