A minimal, extensible framework for building fluent APIs with format-neutral pipeline processing.
PipeFitter provides a unified data representation (FNode) that enables seamless transformation between any structured data format while maintaining semantic consistency. It features a fluent API for building processing pipelines that work consistently across JSON, XML, CSV, and other data formats.
- Format Neutrality: Single data model works with JSON, XML, CSV, YAML, databases, etc.
- Semantic Consistency: Same operations produce consistent results across formats
- Fluent API: Readable, chainable operations with minimal cognitive overhead
- Extensibility: Plugin architecture for custom formats and operations
- Type Safety: Full TypeScript support with compile-time validation
- Zero Dependencies: Core framework has no external runtime dependencies
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β PipeFitter Core β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Fluent API Layer β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β from() / to()β βmap()/filter()β βbranch()/merge()β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Semantic Layer β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β βFormat β βTransformationβ βFormat-Aware β β
β βSemantics β βEngine β βOperations β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Core Data Layer β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β βFNode β βMessage β βContext β β
β β(Pure Data) β β(Processing) β β(Execution) β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
npm install pipefitter
import { PipeFitter, createFNode, FNodeType } from 'pipefitter';
// Create sample data
const userData = createFNode(FNodeType.COLLECTION, 'users');
// Build processing pipeline
const pipeline = new PipeFitter()
.from(userData)
.map(msg => {
// Transform data
return enhanceUserData(msg);
})
.filter(msg => isActiveUser(msg))
.to(outputAdapter);
import {
TransformationEngine,
FormatAwareOperations,
FormatType,
JSON_SEMANTICS,
CSV_SEMANTICS
} from 'pipefitter';
// Setup semantic layer
const engine = new TransformationEngine();
engine.register(JSON_SEMANTICS);
engine.register(CSV_SEMANTICS);
const operations = new FormatAwareOperations(engine);
// Process data in format-aware manner
const pipeline = new PipeFitter()
.from(jsonData)
.map(msg => {
// Filter active users using format-aware operations
return operations.filter(
msg,
user => operations.extractValue(user, 'active', FormatType.JSON) === true,
FormatType.JSON
);
})
.map(msg => {
// Transform to CSV format
return engine.transformMessage(msg, FormatType.JSON, FormatType.CSV);
})
.to(csvOutputAdapter);
// Same processing logic works across different input formats
async function processUserData(inputData: any, inputFormat: FormatType) {
const pipeline = new PipeFitter()
.from(parseFormat(inputData, inputFormat))
// Format-aware filtering - works the same regardless of input format
.map(msg => operations.filter(
msg,
user => operations.extractValue(user, 'active', inputFormat) === true,
inputFormat
))
// Business logic transformation
.map(msg => enrichUserData(msg))
// Output in desired format
.map(msg => engine.transformMessage(msg, inputFormat, FormatType.JSON))
.to(JsonAdapter.output());
return pipeline;
}
// Works with any supported format
const jsonResult = await processUserData(jsonData, FormatType.JSON);
const csvResult = await processUserData(csvData, FormatType.CSV);
const xmlResult = await processUserData(xmlData, FormatType.XML);
FNode is the core data representation that can express any structured data format:
interface FNode {
type: FNodeType; // Semantic type (COLLECTION, RECORD, FIELD, VALUE)
name: string; // Element name
value?: Primitive; // Primitive value for leaf nodes
children?: FNode[]; // Child elements
attributes?: FNode[]; // Attributes/metadata
// ... additional properties for namespaces, IDs, etc.
}
Messages wrap FNode data with processing metadata and execution context:
interface Message {
data: FNode; // Pure data structure
metadata: Record<string, Record<string, any>>; // Processing metadata
context: Context; // Execution environment
}
The semantic layer enables format-neutral operations by mapping format-specific structures to universal semantic roles:
- Format Semantics: Define how each format's elements map to semantic roles
- Transformation Engine: Converts between formats via semantic representation
- Format-Aware Operations: Provide consistent functional operations across formats
class PipeFitter {
from(source: FNode | Adapter): PipeFitter
to<T>(target: Adapter): T | Promise<T>
map(fn: (msg: Message) => Message): PipeFitter
filter(predicate: (msg: Message) => boolean): PipeFitter
find(predicate: (msg: Message) => boolean): PipeFitter
branch(predicate: (msg: Message) => boolean): PipeFitter
merge(strategy: (branch: Message, parent: Message) => Message): PipeFitter
}
class FormatAwareOperations {
find(message: Message, predicate: (item: FNode) => boolean, format: FormatType): FNode[]
filter(message: Message, predicate: (item: FNode) => boolean, format: FormatType): Message
map<T>(message: Message, mapper: (item: FNode) => T, format: FormatType): T[]
groupBy(message: Message, keyExtractor: (item: FNode) => string, format: FormatType): Map<string, FNode[]>
// ... additional operations
}
class TransformationEngine {
register(semantics: FormatSemantics): void
transform(node: FNode, sourceFormat: FormatType, targetFormat: FormatType): FNode
transformMessage(message: Message, sourceFormat: FormatType, targetFormat: FormatType): Message
isCompatible(sourceFormat: FormatType, targetFormat: FormatType): boolean
}
PipeFitter supports extensions for custom formats and operations:
import { ExtensionRegistry, Extension } from 'pipefitter';
const myExtension: Extension = {
name: 'my-extension',
version: '1.0.0',
methods: [{
name: 'customOperation',
implementation: function(this: PipeFitter, options: any) {
return this.map(msg => customTransform(msg, options));
}
}]
};
await ExtensionRegistry.register(myExtension);
// Use custom method
const result = new PipeFitter()
.from(data)
.customOperation({ setting: 'value' })
.to(output);
- Core data structures (FNode, Message, Context)
- Fluent API with all major operations
- Semantic layer with format semantics
- Transformation engine for cross-format conversion
- Format-aware operations
- Extension system
- Function composition utilities
- TypeScript support with strict typing
- Resource management and cleanup
- JSON adapter
- CSV adapter
- XML adapter
- Database adapters
- File system adapters
Build all formats (ESM, CommonJS, types, and bundles):
npm run build
Production build with optimizations:
npm run build:prod
Development mode with watch:
npm run build:watch
npm run dev
npm run lint
After building, the dist/
directory contains:
dist/
βββ esm/ # ES modules (import/export)
βββ cjs/ # CommonJS modules (require/module.exports)
βββ types/ # TypeScript declaration files
βββ bundle/ # Browser bundles
βββ pipefitter.min.js # UMD minified bundle
βββ pipefitter.esm.js # ESM browser bundle
The build process includes bundle size analysis. File sizes are displayed after each build, including gzipped sizes for the minified bundles.
MIT
Contributions are welcome! Please ensure all contributions follow the project's principles of simplicity, consistency, and format neutrality.