Data Formats
This article describes how your ETL applications can read and write different data formats via the ETL dataflow, to and from files, strings, Stream, etc.
Note
To read or write a data format in the control flow (without involving any dataflow), use .NET or libraries directly, e.g. in an ActionWorker.
CSV Delimited and Fixed Format
See CSV Delimited and Fixed Format for reading and writing CSV delimited as well as fixed format files, strings and streams.
JSON Format
This JSON ETL example demonstrates reading and writing JSON in the ETL dataflow. There is no dedicated JSON worker since it's so trivial to integrate this functionality using existing .NET libraries.
The reading part of this ETL example:
- Reads a JSON array text file with ReadAllText(String) into a string
- Passes the string to a Json.NET method which deserializes it into an IEnumerable<T>
- Passes the
IEnumerable<T>
to an EnumerableSource<TOutput> which starts the dataflow
The writing part of the ETL:
- Collects all target rows with CollectionTarget<TInput>
- In an ActionTarget<TInput>:
- Uses a Json.NET method to serialize the rows into a string
- Passes the string to WriteAllText(String, String) which writes to disk
//using actionETL;
//using Newtonsoft.Json;
//using System.IO;
var sos = new WorkerSystem()
.Root(root =>
{
var inputRows = JsonConvert.DeserializeObject<List<Stock>>(
File.ReadAllText("Src/DataFormats/input.json")
);
var source = new EnumerableSource<Stock>(root, "source", inputRows);
// Insert any transform processing...
var target = source.Output.Link.Collection1Target("target");
var output = new ActionWorker(root, "output", () => target.IsSucceeded
, aw =>
{
var text = JsonConvert.SerializeObject(target.Rows, Formatting.Indented);
File.WriteAllText("output.json", text);
});
})
.Start();
Dataflow row:
internal class Stock
{
public string Item { get; set; }
public int Quantity { get; set; }
}
JSON data:
[
{
"Item": "CD-4267",
"Quantity": 3
},
{
"Item": "PO-5467",
"Quantity": 1
}
]
Note
- The above reading approach is simple and suitable when the data is small enough to load into a single List<T> and a single string. For large data volumes, instead read the data by streaming it and process one row at a time, e.g. using the technique described in this stackoverflow post.
- On .NET Standard, .NET Core, and .NET 5+, especially for very large data volumes, also consider using the .NET JSON functionality.
Text Format
Processing text line by line (without splitting each line into columns) is a common ETL requirement. Here we use .NET APIs directly to read and write the text lines using StringNumberRow, and also add a line number to each line.
//using actionETL;
//using System.IO;
var sos = new WorkerSystem()
.Root(root =>
{
var inputLines = File.ReadLines("Src/DataFormats/input.txt")
.Select((l, n) => new StringNumberRow(l, n));
var source = new EnumerableSource<StringNumberRow>(root, "source", inputLines);
// Insert any transform processing...
var target = source.Output.Link.RowsActionTarget("target", rat =>
{
// Invoked once every BufferCapacity rows.
// Add line number and write lines.
File.AppendAllLines("output-file.txt"
, rat.Input.TakeBuffer().Select(r => r.Number + ", " + r.Row));
});
target.Input.BufferCapacity = 4096; // Speed up this file IO
})
.Start();
Note that since IEnumerable<T> is synchronous, each EnumerableSource<TOutput> in the above example occupies a thread, which if you run many of them in parallel could potentially slow down their start and consume excessive memory.
If this is an issue, instead configure the CSV source (which processes data asynchronously) to read a single column. For reference, we also use a CSV target worker to write a single column.
//using actionETL;
//using actionETL.FileHelper;
//using FileHelpers;
[DelimitedRecord("¬¬")] // Any delimiter not appearing in input
internal class FullRow
{
public string Row { get; set; }
}
var sos = new WorkerSystem()
.Root(root =>
{
var source = new FileHelperFileSource<FullRow>(root, "source"
, "Src/DataFormats/input.txt");
// Insert any transform processing...
var target = source.Output.Link.FileHelperFileTarget("target", "output.txt");
target.HeaderText = null;
})
.Start();
XLSX Spreadsheet
See XLSX (Excel) Spreadsheets for reading and writing XLSX spreadsheets.
Other Formats
.NET itself, plus freely available libraries provide easy access to a vast number of data formats, and as shown above with text files and JSON, actionETL makes it easy to integrate these into dataflows. You can follow the same approach for any other format that .NET or a library can process.
Reading a format is most easily done by converting the source data into an IEnumerable<T>, which is passed to an EnumerableSource<TOutput> dataflow worker.
Writing a format is normally done in or with a target worker, e.g. RowsActionTarget<TInput>, ActionTarget<TInput> or CollectionTarget<TInput>. If needed the dataflow rows are converted before passing them to .NET or other library code, which in turn writes them to their destination.
If more customization and encapsulation is needed, the reading or writing code can be wrapped in a custom worker as has been done with the CSV and XLSX (Excel) Spreadsheets formats. Also see Custom Dataflow Workers.
Performance Considerations
Here are some performance considerations when reading and writing data formats in your ETL applications.
Limit Parallel Work
If processing large data volumes overloads e.g. data sources, network bandwidth, or memory, limit the amount of work running in parallel by using Start Constraints and Grouping Workers.
Streaming
For large data volumes, avoid reading all the data into a collection or string, since this requires holding all the data in memory simultaneously. Instead stream the data, and process rows one at a time (or a buffer at a time).
Blocking Threads
Using an IEnumerable<T> for all the data in a data source (see the reading JSON example) will occupy and block a thread until all the data has been consumed. If this takes a long time, and you do this across many workers at the same time, this could exhaust the thread pool (slowing down performance), and consume an excessive amount of memory.
The above also applies to other synchronous APIs that process all data in one go.
If this is an issue, use APIs that process data in smaller chunks, and/or process the data asynchronously.