Search Results for

    Show / Hide Table of Contents

    Adb Keep Open Connection

    Regular connections from AdbConnectionBuilder can only be used with a single database worker. This article describes how to use AdbKeepOpenConnectionBuilder, which allows a single connection to be used with multiple workers.

    Common use cases include:

    • Working with a temporary database table across multiple workers
    • Executing multiple database workers inside a single transaction

    Temporary Table Example

    This example uses a single keep-open connection across three workers to:

    • Create a new "#TempProduct" temporary table
    • Insert rows into the new temporary table from a CSV file
    • Run a DELETE query that joins on the new temporary table

    With regular connections, the temporary table would not have been retained across workers.

    using actionETL;
    using actionETL.Adb;
    using actionETL.Adb.SqlClientExternal;
    using actionETL.FileHelper;
    using FileHelpers;
    
    public static partial class KeepOpenTemporaryTable
    {
        [DelimitedRecord(",")]
        private sealed class Product
        {
            public int ProductId2;
        }
    
        public static SystemOutcomeStatus RunExample()
        {
            return new WorkerSystem()
                .Root(ws =>
                {
                    const string tempTable = "#TempProduct";
                    var cb = ws.DisposeOnFinished(AdbSqlClientProvider.Get()
                        .CreateKeepOpenConnectionBuilder(ws.Config["SqlServer"])
                        );
    
    
                    var create = new AdbExecuteNonQueryWorker(ws, "CreateTable"
                        , cb, $@"CREATE TABLE {tempTable} ([ProductId2] [int] NOT NULL)");
    
    
                    var readCSV = new FileHelperFileSource<Product>(ws, "Read CSV"
                        , () => create.IsSucceeded, @"Src/KeepOpenTemporaryTable/Product.csv");
    
                    var insert = readCSV.Output.Link.AdbInsertTarget(
                        "Insert", cb, tempTable);
    
    
                    _ = new AdbExecuteNonQueryWorker(ws, "DeleteRows", () => insert.IsSucceeded
                        , cb, $@"
                            DELETE p
                            FROM dbo.Products p
                            INNER JOIN {tempTable} tt ON p.ProductId2 = tt.ProductId2
                            ");
                })
                .Start();
        }
    }
    
    /* The example assumes the following table already exists:
        CREATE TABLE dbo.Products
        (
            ProductId2 int
        )
     */
    

    Key Points

    • Unlike AdbConnectionBuilder, the AdbKeepOpenConnectionBuilder instance must be disposed, which in this example is done with DisposeOnFinished<TDisposable>(TDisposable). Other disposing options:
      • UsingActionWorker<TDisposable> (which would have the above workers as its children)
      • Calling Dispose() in an AddCompletedCallback(Func<WorkerBase, OutcomeStatus, Task<OutcomeStatus>>) callback (on an additional worker, again with the above workers as its children)
    • The AdbKeepOpenConnectionBuilder instance is passed to all workers that interacts with the temporary table. Each of these workers creates - opens - closes - disposes the connection as if they owned it, but the underlying .Net connection:
      • Is only created and opened by the first "CreateTable" worker
      • Is only closed and disposed when the connection builder in the parent is disposed
    • Connections are not thread-safe - the library user must ensure only one worker using the connection is running at the same time. Here this is done with:
      • Start constraints on "Read CSV" and "DeleteRows"
      • Via the dataflow link - "Insert" won't start until "Read CSV" is ready to send data
    Note
    • Always use a regular AdbConnectionBuilder if the keep-open feature is not needed
    • Connections and AdbKeepOpenConnectionBuilder instances should be closed and disposed as early as possible - they consume resources, and can timeout if left idle

    Transactions

    To run multiple Adb workers within a single transaction, AdbKeepOpenConnectionBuilder together with AdbTransaction must be used.

    The simplest way to do this is to use AdbTransactionActionWorker, which comes pre-configured for this. Alternatively, AdbTransaction can be used explicitly for greater flexibility.

    AdbTransactionActionWorker Example

    This example uses AdbTransactionActionWorker to perform database work across two database workers, inside a single local transaction:

    • AdbTransactionActionWorker automatically creates and opens a connection and transaction
    • Rows are inserted from one CSV file into two database tables
      • Note that the second insert explicitly waits for the first insert to complete, since a single connection must only be used by a single worker (and thread) at a time
      • Since the two insert workers share an upstream dataflow worker, creating an explicit dependency between them with a start constraint could lead to a deadlock. This is avoided by making the worker with the start constraint buffer all incoming rows. Also see Worker Start Constraints.
    • AdbTransactionActionWorker automatically commits the transaction on success, or rolls back on error, and disposes the transaction and connection
    using actionETL;
    using actionETL.Adb;
    using actionETL.Adb.SqlClientExternal;
    using actionETL.FileHelper;
    using FileHelpers;
    
    public static partial class AdbTransactionWorker
    {
        [DelimitedRecord(",")]
        [IgnoreFirst(1)]
        private sealed class Product
        {
            public int Id;
            public int CategoryId;
            public string Name;
            public string Description;
            public string Notes;
        }
    
        public static SystemOutcomeStatus RunExample()
        {
            return new WorkerSystem()
                .Root(ws =>
                {
                    var akocb = AdbSqlClientProvider.Get()
                        .CreateKeepOpenConnectionBuilder(ws.Config["SqlServer"]);
    
                    var loadProduct = new AdbTransactionActionWorker(ws, "Load Product"
                        , akocb
                        , (ataw, cb) =>
                        {
                            var readCSV = new FileHelperFileSource<Product>(ataw, "Read CSV"
                                , @"Src/AdbTransactionWorker/Product.csv");
    
                            var mc = readCSV.Output.Link.MulticastTransform("Multicast", 2);
    
                            var ic = mc.TypedOutputs[0].Link.AdbInsertTarget(
                                "Insert ProductCore", cb, "dbo.ProductCore");
    
                            var ie = mc.TypedOutputs[1].Link.AdbInsertTarget(
                                "Insert ProductExtra", cb, "dbo.ProductExtra");
                            ie.IsStartable = () => ic.IsCompleted;
                            ie.Input.BufferingMode = PortBufferingMode.Full;
                        });
                    loadProduct.DisposeOnFinished(akocb);
                })
                .Start();
        }
    }
    
    /* The example assumes the following tables already exists:
        CREATE TABLE dbo.ProductCore
        (
            Id int,
            CategoryId int,
            Name nvarchar(50)
        )
    
        CREATE TABLE dbo.ProductExtra
        (
            Id int,
            Description nvarchar(1000),
            Notes nvarchar(1000)
        )
    */
    

    AdbTransaction Example

    This example uses AdbTransaction directly to perform the same database work as in the previous example, across two database workers, inside a single local transaction.

    Note

    Implementing the transaction explicitly as in this example is more complex, and should only very rarely be needed.

    The UsingActionWorker<TDisposable>:

    • Groups the workers, which provides a single point to check if any failed
    • Automatically disposes the AdbKeepOpenConnectionBuilder after the worker finishes.
    • The main action callback:
      • Creates a connection
      • Adds a callback that commits, rollbacks, and disposes the connection and transaction, with exception handling, after the worker completes
      • Opens the connection and begins a transaction
      • Creates the child workers
        • Rows are inserted from one CSV file into two database tables
        • Note that the second insert explicitly waits for the first insert to complete, since a single connection can only be used by a single thread at a time
    using System;
    using actionETL;
    using actionETL.Adb;
    using actionETL.Adb.SqlClientExternal;
    using actionETL.FileHelper;
    using actionETL.Logging;
    using FileHelpers;
    
    public static partial class AdbExplicitTransaction
    {
        [DelimitedRecord(",")]
        [IgnoreFirst(1)]
        private sealed class Product
        {
            public int Id;
            public int CategoryId;
            public string Name;
            public string Description;
            public string Notes;
        }
    
        public static SystemOutcomeStatus RunExample()
        {
            var provider = AdbSqlClientProvider.Get();
    
            return new WorkerSystem()
                .Root(ws =>
                {
                    _ = new UsingActionWorker<AdbKeepOpenConnectionBuilder>(ws, "Load Product"
                        , () => provider.CreateKeepOpenConnectionBuilder(ws.Config["SqlServer"])
                        , (uat, cb) =>
                        {
                            // Create connection, transaction, commit, rollback, and cleanup
                            var adbConnection = cb.Create();
                            uat.AddCompletedCallback((uat2, os) =>
                            {
                                if (adbConnection.Transaction != null)
                                {
                                    if (os.IsSucceeded)
                                    {
                                        try
                                        {
                                            adbConnection.Transaction?.Commit();
                                        }
                                        catch (Exception ex)
                                        {
                                            os = OutcomeStatus.Error(
                                                ALogCategory.DatabaseExceptionCommit, ex);
                                        }
                                    }
                                    if (!os.IsSucceeded)
                                    {
                                        try
                                        {
                                            adbConnection.Transaction?.Rollback();
                                        }
                                        catch (Exception ex)
                                        {
                                            os = os.Combine(OutcomeStatus.Error(
                                                ALogCategory.DatabaseExceptionRollback, ex));
                                        }
                                    }
                                }
    
                                adbConnection.Dispose();
                                return os.ToTask();
                            });
                            adbConnection.Open();
                            adbConnection.BeginTransaction();
    
    
                            // Create workers
                            var readCSV = new FileHelperFileSource<Product>(uat, "Read CSV"
                                , @"Src/AdbExplicitTransaction/Product.csv");
    
                            var mc = readCSV.Output.Link.MulticastTransform("Multicast", 2);
    
                            var ic = mc.TypedOutputs[0].Link.AdbInsertTarget(
                                "Insert ProductCore", cb, "dbo.ProductCore");
    
                            var ie = mc.TypedOutputs[1].Link.AdbInsertTarget(
                                "Insert ProductExtra", cb, "dbo.ProductExtra");
                            ie.IsStartable = () => ic.IsCompleted;
                        });
                })
                .Start();
        }
    }
    
    /* The example assumes the following tables already exist:
        CREATE TABLE dbo.ProductCore
        (
            Id int,
            CategoryId int,
            Name nvarchar(50)
        )
    
        CREATE TABLE dbo.ProductExtra
        (
            Id int,
            Description nvarchar(1000),
            Notes nvarchar(1000)
        )
    */
    

    See Also

    • SQL Database Access
      • Adb Supported Databases
      • Adb Commands and Parameters
      • Adb Information
      • Custom Adb Development
    In This Article
    Back to top Copyright © 2023 Envobi Ltd