We make Data fit for C#.
From local files to cloud scale β LINQ all the way down.
Let IntelliSense and the compiler do the work.
- df.filter(pl.col("ammount") > 1000) # Typo? Runtime error.
+ .Where(o => o.Amount > 1000) // Typo? Won't compile. β# Install via NuGet
dotnet add package DataFlow.Net --version 1.2.1- Sound Familiar?
- Three Simple Rules
- Everything is a Stream
- Quick Start
- Documentation
- Community & Support
.NET developers know the story β You write a clean, type-safe data processor in C# β It works perfectly on your dev machine β Then reality hits:
-
The Data Grows:
- 10 MB:
List<T>works fine. - 10 GB:
OutOfMemoryException. You rewrite usingStreamReader. - 10 TB: You abandon C# for Spark/SQL. You lose type safety and duplicate logic.
- 10 MB:
-
The Logic Tangles:
- New requirements mean new
if/elsebranches. - You loop over the same data 5 times to handle 5 different cases.
- The code becomes spaghetti, and the data lifecycle becomes a black box.
- New requirements mean new
-
The Source Fragments:
- Today it's a CSV file. Tomorrow it's a REST API. Next week it's a Kafka Stream.
- For each source, you write different adapter code.
- You end up with a "Code Salad": mixed abstractions, different error handling, and no reuse.
DataFlow.NET was built to stop this cycle:
- β Unified API β Same code for CSV, JSON, Kafka, Spark
- β
Constant memory β Stream billions of rows without
OutOfMemoryException(see benchmarks) - β
No spaghetti β Declarative
Casespattern replaces nestedif/else - β Pure C# β LINQ all the way down
Tip
Define the what. DataFlow.NET handles the how.
DataFlow.NET is more than a framework β it defines a pattern to process data.
graph LR
S[**S**ink] --> U[**U**nify]
U --> P[**P**rocess]
P --> R[**R**oute]
R --> A[**A**pply]
style S fill:#f9f,stroke:#333,stroke-width:2px
style A fill:#bbf,stroke:#333,stroke-width:2px
We call this the SUPRA pattern β the name comes from gathering the first letter of each stage: Sink, Unify, Process, Route, Apply.
Note
The SUPRA pattern ensures memory stays constant and items flow one at a time. Read the SUPRA-Pattern Guide β
To achieve the SUPRA pattern, you'll have to follow these rules:
- Sink First β Buffer and normalize at the edge, never in the middle.
- Flow Lazy β Items stream one by one. Constant memory.
- Route Declaratively β No more
if/elsespaghetti.
DataFlow.NET provides all the ready-to-use blocks to natively apply these rules.
DataFlow.NET provides tools to abstract the source of data from the processing. Use these to make every data source an IAsyncEnumerable<T> stream β the essence of the "Unified API" β same LINQ operators, same processing logic, regardless of origin.
See Integration Patterns Guide β
| Source Type | Pattern | Output |
|---|---|---|
| EF Core (SQL Server, PostgreSQL, etc.) | .AsAsyncEnumerable() |
IAsyncEnumerable<T> |
| JSON/CSV/YAML Files | Read.Json<T>() / Read.Csv<T>() |
IAsyncEnumerable<T> |
| REST APIs | .Poll() + .SelectMany() |
IAsyncEnumerable<T> |
| Kafka / RabbitMQ / WebSocket | Wrap + .WithBoundedBuffer() |
IAsyncEnumerable<T> |
| Snowflake (Premium) | Snowflake.Connect().Read.Table<T>() |
SnowflakeQuery<T> |
| Apache Spark (Premium) | Spark.Connect().Read.Table<T>() |
SparkQuery<T> |
Important
Any IAsyncEnumerable<T> source integrates natively.
Already using Entity Framework Core? DataFlow.NET plugs right in:
// EF Core β Native support
await dbContext.Orders.AsAsyncEnumerable()
.Where(o => o.Amount > 100)
.WriteCsv("orders.csv");- β EF Core handles database access
- β DataFlow.NET handles processing logic
- β Works with SQL Server, PostgreSQL, MySQL, SQLite
Need to integrate REST APIs or message queues? Use polling and buffering:
// REST API β Poll and flatten
var orders = (() => httpClient.GetFromJsonAsync<Order[]>("/api/orders"))
.Poll(TimeSpan.FromSeconds(5), token)
.SelectMany(batch => batch.ToAsyncEnumerable());
// Kafka/WebSocket β Wrap in async iterator + buffer
var kafkaStream = ConsumeKafka(token).WithBoundedBuffer(1024);DataFlow.NET provides high-performance file readers: no Reflection on the hot path; expression trees are compiled once and cached.
- 4x faster than standard reflection-based creation (benchmark results β)
- Zero allocation overhead β same 48 bytes as native
new()instantiation - Handles CSV, JSON, and YAML files generically.
We carefully crafted an intuitive, fully-featured readers API with advanced error handling β all while streaming row-by-row.
Tip
The streaming row-by-row approach β absent in most other frameworks β is the cornerstone of DataFlow.NET's constant memory usage.
Materialization Quick Reference β | Data Reading Guide β
DataFlow.NET implements additional LINQ extensions to make every data loop composableβeven side-effect loops.
- Independent implementation β Re-implemented
IAsyncEnumerablemethods without depending onSystem.Linq.Async - Clear terminal vs non-terminal separation β Terminal methods (
Do(),Display()) force execution; non-terminal methods (ForEach(),Select(),Where()) stay lazy
See Extension Methods API Reference β
We've extended standard LINQ with custom operators for declarative branching. Using Cases, SelectCase, and ForEachCase, you can replace complex nested if/else blocks with an optimized, single-pass dispatch tree β while remaining fully composable.
This is the "U" (Unify) step of the SUPRA pattern β "absorb many sources into one stream."
var unifiedStream = new UnifiedStream<Log>()
.Unify(fileLogs, "archive")
.Unify(apiLogs, "live")
.Unify(dbLogs, "backup");
// Result: A single IAsyncEnumerable<Log> you can queryInsert observation points anywhere in your pipeline without changing data flow. Because Spy() is fully composable, you can add or remove traces by simply commenting a line β no code rewriting required.
await data
.Where(...)
.Spy("After filtering") // π See items flow through
.Select(...)
.Spy("After transformation")
.ForEach(...) // π Side-effect iteration, still composable
.Do(); // π Force execution (no output needed)
β οΈ Note: Due to lazy execution, output from multipleSpy()calls appears interleaved (item-by-item), not grouped by stage. This preserves the streaming nature of the pipeline.
Need to parallelize CPU-intensive or I/O-bound work? DataFlow.NET provides parallel counterparts that work just like their sequential equivalents β still lazy, still composable:
// Parallel sync processing
await data.AsParallel()
.Select(item => ExpensiveCompute(item))
.ForEach(item => WriteToDb(item))
.Do();
// Parallel async processing
await asyncStream.AsParallel()
.WithMaxConcurrency(8)
.Select(async item => await FetchAsync(item))
.Do();See ParallelAsyncQuery API Reference β | Parallel Processing Guide β | Extension Methods β
If you hit the limit of local computing power, DataFlow.NET lets you seamlessly scale to the cloud with LINQ-to-Spark & Snowflake. Your C# lambda expressions are decompiled at runtime and translated into native Spark/SQL execution plans.
- β No data transfer to client
- β Execution happens on the cluster
- β Full type safety
LINQ-to-Spark Guide β | LINQ-to-Snowflake Guide β
- .NET 8.0 SDK or later
Via NuGet (Recommended):
dotnet add package DataFlow.Net --version 1.2.1Or clone the repository:
git clone https://github.com/improveTheWorld/DataFlow.NET
cd DataFlow.NETdotnet run --project DataFlow.Test.UsageExamples/DataFlow.App.UsageExamples.csprojOr open the full solution in Visual Studio 2022:
DataFlow.Net.sln
using DataFlow;
// A complete, memory-efficient pipeline in 10 lines
await Read.Csv<Order>("orders.csv")
.Cases(
o => o.Amount > 1000,
o => o.CustomerType == "VIP"
)
.SelectCase(
highValue => ProcessHighValue(highValue),
vip => ProcessVip(vip)
)
.AllCases()
.WriteJson("output.json");Your business rule is: "Flag high-value transactions from international customers."
// 1. DEVELOPMENT: Read from a local CSV file
await Read.Csv<Order>("orders.csv")
.Cases(o => o.Amount > 10000, o => o.IsInternational) // π Your Logic
.SelectCase(...)
.AllCases()
.WriteCsv("output.csv");
// 2. PRODUCTION: Merge multiple async streams
await new UnifiedStream<Order>()
.Unify(ordersApi, "api")
.Unify(ordersDb, "db")
.Cases(o => o.Amount > 10000, o => o.IsInternational) // π SAME Logic
.SelectCase(...)
.AllCases()
.WriteJson("output.json");
// 3. CLOUD: Query Snowflake Data Warehouse
// Filters and aggregations execute on the server
using var sfContext = Snowflake.Connect(account, user, password, database, warehouse);
await sfContext.Read.Table<Order>("orders")
.Where(o => o.Year == 2024)
.Cases(o => o.Amount > 10000, o => o.IsInternational) // π SAME Logic
.SelectCase(...)
.ToList();
// 4. SCALE: Run on Apache Spark (Petabyte Scale)
// Translates your C# Expression Tree to native Spark orchestration
using var sparkContext = Spark.Connect("spark://master:7077", "MyApp");
sparkContext.Read.Table<Order>("sales.orders")
.Where(o => o.Amount > 10000)
.Cases(o => o.Amount > 50000, o => o.IsInternational) // π SAME Logic
.SelectCase(...)
.AllCases()
.WriteParquet("s3://data/output");| Topic | Description |
|---|---|
| π° SUPRA Pattern | The SUPRA Pattern deep dive |
| π Cases Pattern | The Cases/SelectCase/ForEachCase Engine |
| π Data Reading | Reading CSV, JSON, YAML, Text |
| π― Materialization Guide | Design classes for CSV, JSON, YAML, Snowflake, Spark |
| βοΈ Data Writing | Writing CSV, JSON, YAML, Text |
| π Stream Merging | UnifiedStream & Multi-Source Streams |
| π Polling & Buffering | Data acquisition patterns |
| π₯ Big Data | Running C# on Apache Spark |
| βοΈ Snowflake | LINQ-to-Snowflake Provider |
| π Performance | The Zero-Allocation Engine |
| π API Reference | Complete API Documentation |
| π§© Extension Methods | IEnumerable/IAsyncEnumerable/Parallel API Matrix |
| π Integration Patterns | HTTP, Kafka, EF Core, WebSocket examples |
| β‘ Parallel Processing | ParallelQuery & ParallelAsyncQuery |
| β‘ ParallelAsyncQuery | Parallel async processing API |
| π§ͺ Test Coverage | Coverage Reports (60% Weighted) |
| πΊοΈ Roadmap | Future Enterprise Connectors |
- Issues: GitHub Issues
- Discord: Join the Community
- Email: tecnet.paris@gmail.com
DataFlow.NET β Sink the chaos. Let the rest flow pure. π