Skip to content

Unified sync/async stream processing with category-based filtering. Single API for IEnumerable/IAsyncEnumerable with Cases/SelectCase/ForEachCase pattern. Eliminates Rx complexity, enables elegant pipeline composition for real-time data processing.

Notifications You must be signed in to change notification settings

improveTheWorld/DataFlow.NET

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

88 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

DataFlow.NET

We make Data fit for C#.

From local files to cloud scale β€” LINQ all the way down.
Let IntelliSense and the compiler do the work.

- df.filter(pl.col("ammount") > 1000)   # Typo? Runtime error.
+ .Where(o => o.Amount > 1000)          // Typo? Won't compile. βœ“

License Tests Coverage NuGet NuGet Downloads

# Install via NuGet
dotnet add package DataFlow.Net --version 1.2.1

Table of Contents

  1. Sound Familiar?
  2. Three Simple Rules
  3. Everything is a Stream
  4. Quick Start
  5. Documentation
  6. Community & Support

1. Sound Familiar?

.NET developers know the story β€” You write a clean, type-safe data processor in C# β€” It works perfectly on your dev machine β€” Then reality hits:

  1. The Data Grows:

    • 10 MB: List<T> works fine.
    • 10 GB: OutOfMemoryException. You rewrite using StreamReader.
    • 10 TB: You abandon C# for Spark/SQL. You lose type safety and duplicate logic.
  2. The Logic Tangles:

    • New requirements mean new if/else branches.
    • You loop over the same data 5 times to handle 5 different cases.
    • The code becomes spaghetti, and the data lifecycle becomes a black box.
  3. The Source Fragments:

    • Today it's a CSV file. Tomorrow it's a REST API. Next week it's a Kafka Stream.
    • For each source, you write different adapter code.
    • You end up with a "Code Salad": mixed abstractions, different error handling, and no reuse.

DataFlow.NET was built to stop this cycle:

  • βœ… Unified API β€” Same code for CSV, JSON, Kafka, Spark
  • βœ… Constant memory β€” Stream billions of rows without OutOfMemoryException (see benchmarks)
  • βœ… No spaghetti β€” Declarative Cases pattern replaces nested if/else
  • βœ… Pure C# β€” LINQ all the way down

Tip

Define the what. DataFlow.NET handles the how.


2. Three Simple Rules

DataFlow.NET is more than a framework β€” it defines a pattern to process data.

graph LR
    S[**S**ink] --> U[**U**nify]
    U --> P[**P**rocess]
    P --> R[**R**oute]
    R --> A[**A**pply]
    
    style S fill:#f9f,stroke:#333,stroke-width:2px
    style A fill:#bbf,stroke:#333,stroke-width:2px
Loading

We call this the SUPRA pattern β€” the name comes from gathering the first letter of each stage: Sink, Unify, Process, Route, Apply.

Note

The SUPRA pattern ensures memory stays constant and items flow one at a time. Read the SUPRA-Pattern Guide β†’

To achieve the SUPRA pattern, you'll have to follow these rules:

  1. Sink First β€” Buffer and normalize at the edge, never in the middle.
  2. Flow Lazy β€” Items stream one by one. Constant memory.
  3. Route Declaratively β€” No more if/else spaghetti.

DataFlow.NET provides all the ready-to-use blocks to natively apply these rules.


3. Everything is a Stream

DataFlow.NET provides tools to abstract the source of data from the processing. Use these to make every data source an IAsyncEnumerable<T> stream β€” the essence of the "Unified API" β€” same LINQ operators, same processing logic, regardless of origin.

See Integration Patterns Guide β†’

Source Type Pattern Output
EF Core (SQL Server, PostgreSQL, etc.) .AsAsyncEnumerable() IAsyncEnumerable<T>
JSON/CSV/YAML Files Read.Json<T>() / Read.Csv<T>() IAsyncEnumerable<T>
REST APIs .Poll() + .SelectMany() IAsyncEnumerable<T>
Kafka / RabbitMQ / WebSocket Wrap + .WithBoundedBuffer() IAsyncEnumerable<T>
Snowflake (Premium) Snowflake.Connect().Read.Table<T>() SnowflakeQuery<T>
Apache Spark (Premium) Spark.Connect().Read.Table<T>() SparkQuery<T>

Important

Any IAsyncEnumerable<T> source integrates natively.

Examples

Already using Entity Framework Core? DataFlow.NET plugs right in:

// EF Core β€” Native support
await dbContext.Orders.AsAsyncEnumerable()
    .Where(o => o.Amount > 100)
    .WriteCsv("orders.csv");
  • βœ… EF Core handles database access
  • βœ… DataFlow.NET handles processing logic
  • βœ… Works with SQL Server, PostgreSQL, MySQL, SQLite

Need to integrate REST APIs or message queues? Use polling and buffering:

// REST API β€” Poll and flatten
var orders = (() => httpClient.GetFromJsonAsync<Order[]>("/api/orders"))
    .Poll(TimeSpan.FromSeconds(5), token)
    .SelectMany(batch => batch.ToAsyncEnumerable());

// Kafka/WebSocket β€” Wrap in async iterator + buffer
var kafkaStream = ConsumeKafka(token).WithBoundedBuffer(1024);

High-Performance Streaming File Readers

DataFlow.NET provides high-performance file readers: no Reflection on the hot path; expression trees are compiled once and cached.

  • 4x faster than standard reflection-based creation (benchmark results β†’)
  • Zero allocation overhead β€” same 48 bytes as native new() instantiation
  • Handles CSV, JSON, and YAML files generically.

We carefully crafted an intuitive, fully-featured readers API with advanced error handling β€” all while streaming row-by-row.

Tip

The streaming row-by-row approach β€” absent in most other frameworks β€” is the cornerstone of DataFlow.NET's constant memory usage.

Materialization Quick Reference β†’ | Data Reading Guide β†’

LINQ Extensions

DataFlow.NET implements additional LINQ extensions to make every data loop composableβ€”even side-effect loops.

  • Independent implementation β€” Re-implemented IAsyncEnumerable methods without depending on System.Linq.Async
  • Clear terminal vs non-terminal separation β€” Terminal methods (Do(), Display()) force execution; non-terminal methods (ForEach(), Select(), Where()) stay lazy

See Extension Methods API Reference β†’

Cases/SelectCase/ForEachCase

We've extended standard LINQ with custom operators for declarative branching. Using Cases, SelectCase, and ForEachCase, you can replace complex nested if/else blocks with an optimized, single-pass dispatch tree β€” while remaining fully composable.

See Cases Pattern Guide β†’

Multi-Source Stream Merging

This is the "U" (Unify) step of the SUPRA pattern β€” "absorb many sources into one stream."

var unifiedStream = new UnifiedStream<Log>()
    .Unify(fileLogs, "archive")
    .Unify(apiLogs, "live")
    .Unify(dbLogs, "backup");
// Result: A single IAsyncEnumerable<Log> you can query

See Stream Merging Guide β†’

Debug with Spy()

Insert observation points anywhere in your pipeline without changing data flow. Because Spy() is fully composable, you can add or remove traces by simply commenting a line β€” no code rewriting required.

await data
    .Where(...)
    .Spy("After filtering")       // πŸ‘ˆ See items flow through
    .Select(...)
    .Spy("After transformation")
    .ForEach(...)                 // πŸ‘ˆ Side-effect iteration, still composable
    .Do();                        // πŸ‘ˆ Force execution (no output needed)

⚠️ Note: Due to lazy execution, output from multiple Spy() calls appears interleaved (item-by-item), not grouped by stage. This preserves the streaming nature of the pipeline.

Go Parallel When You Need To

Need to parallelize CPU-intensive or I/O-bound work? DataFlow.NET provides parallel counterparts that work just like their sequential equivalents β€” still lazy, still composable:

// Parallel sync processing
await data.AsParallel()
    .Select(item => ExpensiveCompute(item))
    .ForEach(item => WriteToDb(item))
    .Do();

// Parallel async processing
await asyncStream.AsParallel()
    .WithMaxConcurrency(8)
    .Select(async item => await FetchAsync(item))
    .Do();

See ParallelAsyncQuery API Reference β†’ | Parallel Processing Guide β†’ | Extension Methods β†’

Scale to the cloud (Premium)

If you hit the limit of local computing power, DataFlow.NET lets you seamlessly scale to the cloud with LINQ-to-Spark & Snowflake. Your C# lambda expressions are decompiled at runtime and translated into native Spark/SQL execution plans.

  • βœ… No data transfer to client
  • βœ… Execution happens on the cluster
  • βœ… Full type safety

LINQ-to-Spark Guide β†’ | LINQ-to-Snowflake Guide β†’


4. Quick Start

Prerequisites

Installation

Via NuGet (Recommended):

dotnet add package DataFlow.Net --version 1.2.1

Or clone the repository:

git clone https://github.com/improveTheWorld/DataFlow.NET
cd DataFlow.NET

Run the Usage Examples

dotnet run --project DataFlow.Test.UsageExamples/DataFlow.App.UsageExamples.csproj

Or open the full solution in Visual Studio 2022:

DataFlow.Net.sln

Your First Pipeline

using DataFlow;

// A complete, memory-efficient pipeline in 10 lines
await Read.Csv<Order>("orders.csv")
    .Cases(
        o => o.Amount > 1000, 
        o => o.CustomerType == "VIP"
    )
    .SelectCase(
        highValue => ProcessHighValue(highValue),
        vip => ProcessVip(vip)
    )
    .AllCases()
    .WriteJson("output.json");

Advanced: One Logic, Multiple Targets

Your business rule is: "Flag high-value transactions from international customers."

// 1. DEVELOPMENT: Read from a local CSV file
await Read.Csv<Order>("orders.csv")
    .Cases(o => o.Amount > 10000, o => o.IsInternational) // πŸ‘ˆ Your Logic
    .SelectCase(...) 
    .AllCases()
    .WriteCsv("output.csv");

// 2. PRODUCTION: Merge multiple async streams
await new UnifiedStream<Order>()
    .Unify(ordersApi, "api")
    .Unify(ordersDb, "db")
    .Cases(o => o.Amount > 10000, o => o.IsInternational) // πŸ‘ˆ SAME Logic
    .SelectCase(...)
    .AllCases()
    .WriteJson("output.json");

// 3. CLOUD: Query Snowflake Data Warehouse
// Filters and aggregations execute on the server
using var sfContext = Snowflake.Connect(account, user, password, database, warehouse);
await sfContext.Read.Table<Order>("orders")
    .Where(o => o.Year == 2024)
    .Cases(o => o.Amount > 10000, o => o.IsInternational) // πŸ‘ˆ SAME Logic
    .SelectCase(...)
    .ToList();

// 4. SCALE: Run on Apache Spark (Petabyte Scale)
// Translates your C# Expression Tree to native Spark orchestration
using var sparkContext = Spark.Connect("spark://master:7077", "MyApp");
sparkContext.Read.Table<Order>("sales.orders")
    .Where(o => o.Amount > 10000)
    .Cases(o => o.Amount > 50000, o => o.IsInternational) // πŸ‘ˆ SAME Logic
    .SelectCase(...)
    .AllCases()
    .WriteParquet("s3://data/output");

5. Documentation

Topic Description
🏰 SUPRA Pattern The SUPRA Pattern deep dive
πŸ”€ Cases Pattern The Cases/SelectCase/ForEachCase Engine
πŸ“– Data Reading Reading CSV, JSON, YAML, Text
🎯 Materialization Guide Design classes for CSV, JSON, YAML, Snowflake, Spark
✍️ Data Writing Writing CSV, JSON, YAML, Text
🌊 Stream Merging UnifiedStream & Multi-Source Streams
πŸ”„ Polling & Buffering Data acquisition patterns
πŸ”₯ Big Data Running C# on Apache Spark
❄️ Snowflake LINQ-to-Snowflake Provider
πŸš€ Performance The Zero-Allocation Engine
πŸ“‹ API Reference Complete API Documentation
🧩 Extension Methods IEnumerable/IAsyncEnumerable/Parallel API Matrix
πŸ”Œ Integration Patterns HTTP, Kafka, EF Core, WebSocket examples
⚑ Parallel Processing ParallelQuery & ParallelAsyncQuery
⚑ ParallelAsyncQuery Parallel async processing API
πŸ§ͺ Test Coverage Coverage Reports (60% Weighted)
πŸ—ΊοΈ Roadmap Future Enterprise Connectors

6. Community & Support

DataFlow.NET β€” Sink the chaos. Let the rest flow pure. πŸš€

About

Unified sync/async stream processing with category-based filtering. Single API for IEnumerable/IAsyncEnumerable with Cases/SelectCase/ForEachCase pattern. Eliminates Rx complexity, enables elegant pipeline composition for real-time data processing.

Topics

Resources

Stars

Watchers

Forks

Packages

No packages published