Search Results for

    Show / Hide Table of Contents

    Custom Non-Adb Development

    In some cases there might not be a suitable Adb worker available, e.g. due to there not being an Adb provider available yet for a particular database, or the existing Adb classes not supporting a particular use case.

    For these scenarios, one can create and execute standard .NET database queries and commands directly, using any .NET database provider, including integrating with the actionETL dataflow.

    Note that while fully functional, the resulting custom code and custom worker lacks many of the features that the Adb workers provide out of the box.

    Ad-hoc Data Source

    This example demonstrates reading a database result set into the dataflow, using a .NET database provider directly (as opposed to using AdbDataReaderSource<TOutput>).

    The ad-hoc approach in this example is suitable if the code is only ever used in a single place. If there is any reuse, it is much better to create a reusable worker.

    While this example uses the OleDb .NET provider (which can access dozens of different types of databases, but is Windows only), the code would look very similar for other database providers.

    using actionETL;
    using actionETL.Logging;
    using System;
    using System.Data.OleDb;
    using System.Runtime.InteropServices;
    
    public static partial class AdHocOleDbSource
    {
        private sealed class Product
        {
            public int Id;
            public string Name;
            public double Weight;
        }
    
        public static void RunExample()
        {
            const string dbSetting = "OleDbServer";
    
            new WorkerSystem()
                .Root(root =>
                {
                    new ActionSource<Product>(root, "Read Product", async s =>
                    // Any exception will be caught and logged by the worker
                    {
                        // Fetch connection details
                        var cs = root.Config[dbSetting];
    
                        // A using block ensures the connection will be closed and disposed
                        using (var oleDbConnection = new OleDbConnection(cs))
                        {
                            // Create a command, and add one parameter
                            var oleDbCommand = new OleDbCommand(
                                @"SELECT Id, Name, Weight
                                  FROM Product
                                  WHERE Weight > ?; "
                                , oleDbConnection);
                            oleDbCommand.Parameters.AddWithValue("@Weight", 5.5);
    
                            await oleDbConnection.OpenAsync().ConfigureAwait(false);
                            var reader = await oleDbCommand.ExecuteReaderAsync().ConfigureAwait(false);
                            // Synchronous slightly faster with narrow rows: 
                            //     reader.Read() 
                            // Asynchronous more scalable with many workers in parallel:
                            //     await reader.ReadAsync().ConfigureAwait(false)
                            while (reader.Read())
                            {
                                // Create a dataflow row for each database row
                                var row = new Product
                                {
                                    Id = reader.GetInt32(0),
                                    Name = reader.GetString(1),
                                    Weight = reader.GetDouble(2)
                                };
                                if (s.Output.TrySendRow(row)
                                        || await s.Output.SendRowAsync(row).ConfigureAwait(false))
                                    continue;
                                return OutcomeStatus.Error(
                                    ALogCategory.OutputPortUnexpectedNoDemand);
                            }
                            s.Output.SendSucceeded();
                            reader.Close();
                        }
    
                        return OutcomeStatus.Succeeded;
                    })
    
    
                    // Use source data with dataflow transforms or targets
                    .Output.Link.CollectionTarget("Collect Products");
                })
                .Start()
                .ThrowOnFailure();
    
            /* The example assumes the following table already exists:
            CREATE TABLE dbo.Product
            (
                Id int,
                Name nvarchar(50)
                Description nvarchar(1000),
            )
            */
        }
    }
    

    Custom Data Source Worker

    This example performs the same work as the above Ad-hoc Data Source, reading a database result set into the dataflow, using a .NET database provider directly (as opposed to using AdbDataReaderSource<TOutput>).

    Here however we create a reusable worker, which makes it easy to both use in multiple places, and to maintain the code by avoiding code duplication.

    using actionETL;
    using actionETL.Logging;
    using System;
    using System.Data.Common;
    using System.Data.OleDb;
    using System.Runtime.InteropServices;
    using System.Threading.Tasks;
    
    public static partial class CustomOleDbSourceExample
    {
        private sealed class Product
        {
            public int Id;
            public string Name;
            public double Weight;
        }
    
    
        // Create a reusable dataflow source worker
        public class CustomOleDbSource<TOutput>
            : SourceBase<CustomOleDbSource<TOutput>,TOutput>
            where TOutput : class
        {
            private readonly string _connectionString;
            private readonly Func<DbDataReader, TOutput> _createRowFunc;
            private readonly OleDbCommand _oleDbCommand;
    
            public CustomOleDbSource(WorkerParent workerParent, string workerName
                , Func<bool> isStartableFunc
                , string connectionStringConfigurationName
                , OleDbCommand oleDbCommand
                , Func<DbDataReader, TOutput> createRowFunc
                )
                : base(workerParent, workerName, isStartableFunc)
            {
                _connectionString = WorkerSystem.Config[connectionStringConfigurationName];
                _oleDbCommand = oleDbCommand;
                _createRowFunc = createRowFunc;
            }
    
            public CustomOleDbSource(WorkerParent workerParent, string workerName
                , string connectionStringSettingsName
                , OleDbCommand command
                , Func<DbDataReader, TOutput> createRowFunc
                )
                : this(workerParent, workerName, null, connectionStringSettingsName
                      , command, createRowFunc)
            { }
    
            protected async override Task<OutcomeStatus> RunAsync()
            // Any exception will be caught and logged by the base class worker
            {
                // A using block ensures the connection will be closed and disposed
                using (var oleDbConnection = new OleDbConnection(_connectionString))
                {
                    _oleDbCommand.Connection = oleDbConnection;
                    await oleDbConnection.OpenAsync().ConfigureAwait(false);
                    var reader = await _oleDbCommand
                        .ExecuteReaderAsync().ConfigureAwait(false);
                    // Synchronous slightly faster with narrow rows: 
                    //     reader.Read() 
                    // Asynchronous more scalable with many workers in parallel:
                    //     await reader.ReadAsync().ConfigureAwait(false)
                    while (reader.Read())
                    {
                        var row = _createRowFunc(reader);
                        if (Output.TrySendRow(row)
                                || await Output.SendRowAsync(row).ConfigureAwait(false))
                            continue;
                        return OutcomeStatus.Error(
                            ALogCategory.OutputPortUnexpectedNoDemand);
                    }
                    Output.SendSucceeded();
                    reader.Close();
                }
    
                return OutcomeStatus.Succeeded;
            }
        }
    
    
        public static void RunExample()
        {
            new WorkerSystem()
                .Root(root =>
                {
                    // Create a command, and add one parameter
                    var oleDbCommand = root.DisposeOnFinished(new OleDbCommand(
                        @"SELECT Id, Name, Weight
                          FROM Product
                          WHERE Weight > ?; "));
                    oleDbCommand.Parameters.AddWithValue("@Weight", 5.5);
    
                    // Use the source worker created above
                    new CustomOleDbSource<Product>(root, "Read Product"
                        , "OleDbServer", oleDbCommand, dr =>
                        {
                            // Create a dataflow row for each database row
                            return new Product
                            {
                                Id = dr.GetInt32(0),
                                Name = dr.GetString(1),
                                Weight = dr.GetDouble(2)
                            };
                        })
                    
                    // Use source data with dataflow transforms or targets
                    .Output.Link.CollectionTarget("Collect Products");
                })
                .Start()
                .ThrowOnFailure();
    
            /* The example assumes the following table already exists:
            CREATE TABLE dbo.Product
            (
                Id int,
                Name nvarchar(50)
                Description nvarchar(1000),
            )
            */
        }
    }
    

    See Also

    • Worker System
      • Configuration
    • Workers
    • SQL Database Access
      • Dataflow Columns
      • Custom Adb Development
    In This Article
    Back to top Copyright © 2023 Envobi Ltd