Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New idea for Data-Forge core data pipelines in DF v2 #111

Open
ashleydavis opened this issue Feb 17, 2021 · 7 comments
Open

New idea for Data-Forge core data pipelines in DF v2 #111

ashleydavis opened this issue Feb 17, 2021 · 7 comments
Labels
version-2 Saving this for Data-Forge version 2.

Comments

@ashleydavis
Copy link
Member

ashleydavis commented Feb 17, 2021

Just going to document an idea that I had for building data pipelines in Data-Forge.

This is intended for version 2, but I'm thinking it would be more of a "core" library with a higher level and more feature rich library on top that is more like DFN v1.

Please feel free to comment and give feedback!

Some examples follow...

Setting up a data source:

const dataSource = dataForgeFs.fromCsvFile(filePath);

Building a pipeline and invoking core operations:

const pipeline = new dataForge.Pipeline()
    .map(...)        // Transforms data
    .filter(...)        // Filter data.
    .otherOp(...)  // ... Other core operations

Apply the pipeline to the data source

const pipelineApplication = pipeline.apply(dataSource);

Now the pipeline can be invoked and data extracted in various ways.

  1. Simply iterate the pipeline (using for await for async streaming):
for await (const record of pipelineApplication.toIterator()) {
    // ...
}
  1. Convert the data to an array (assuming it can fit in memory), asynchronously:
const data  = await pipelineApplication.toArray();

Or synchronously:

const data  = pipelineApplication.toArraySync();
  1. Pipe the data to a data target, asynchronously:
const dataTarget = dataForgeFS.toCsvFile(filePath);
await pipeApplication.toTarget(dataTarget);

Or synchronously:

await pipeApplication.toTargetSync(dataTarget);

Of course, all the methods can be chained, e.g.:

const data = await new dataForge.Pipeline()
    .map(...)
    .otherOp(...)
    .otherOp(...)
    .apply(dataForgeFs.readCsvFile(filePath))
    .toArray();

Some notes:

  • The parts of the API that matter are async and streaming so that very large files can be processed without having to load the entire file into memory.
  • The laziness is more explicit and hopefully less confusing to beginners (operations aren't executed until data is extracted from an applied pipeline).
  • Decouples data acquisition and data storage from pipeline building and application.
  • Anyone can implement a data source or data target, it will just conform to a simple interface.
  • Only data extraction or piping to a data target is asynchronous (or it can optionally be synchronous, but that breaks the streaming model):
    • Setting up a data source and data target is synchronous.
    • Building the pipeline is synchronous.
    • Applying the pipeline is synchronous.
    • Hopefully, this makes things a bit simpler for new users! (async is hard to learn!)
  • Under the hood the operations in the pipeline are essentially just commands applied to arrays of data, this opens up the possibility that segments of data and stacks of operations could be passed to a lower level native library for efficient processing.
  • Extensions (what used to be plugins) are more explicit, specific and separate (like Data-Forge FS). This makes it easier for other people to write extensions to Data-Forge.
  • Extensions only depend on the core library, instead of the more complicated higher-level library.

Of course, all this is a major departure from the way DF works currently, but there are multiple enhancements that can build on this new API. So I"m thinking this could be more like a "Data-Forge core" library with a higher level library (much like DFN 1) that builds on top of this with more features and functionality.

If it were just the core library it would support the following operations:

  • Data transformation (map/select function)
  • Data aggregation (reduce/aggregate function)
  • Data condensation (aggregating pools of like data according to some criteria)
  • Data filtering (filter/where function)
  • Windowing (dividing the data into batches according to some criteria)
  • Rolling windows (diving the data into rolling windows according to some criteria)

What other core operations would this core need to support?

@ashleydavis ashleydavis added the version-2 Saving this for Data-Forge version 2. label Feb 17, 2021
@rat-matheson
Copy link

This is fantastic. At a high level, it is exactly how I hoped a data forge 2 api might look, really basic operations with more complex ones imported and applied via map, etc. I haven't had a chance to come up with API suggestions yet but I will. The operations you've mapped out above look like a good start.

What exactly is apply and why does it occur after a transformation operation like map in the example above? The dataForgeFs.readCsvFile(filePath) is throwing me off a bit because I assume we already have a dataFrame based pipeline so I'm unsure why we are apply another from a csv

@ashleydavis
Copy link
Member Author

ashleydavis commented Feb 18, 2021

apply is separate from building the pipeline purely just to be separate. It means you can define a pipeline and reuse it on different data sets.

That's just an idea... and I realize it seems a bit backward. Let's think on that idea some more.

I could easily structure it the other way around (more like DF v1), eg:

const dataSource = dataForgeFs.fromCsvFile(filePath);
const dataTarget = dataForgeFS.toCsvFile(filePath);
await dataSource.map(...).filter(...).etc(....).toTarget(dataTarget);

This kind of example reads better... but it doesn't cleanly separate the data pipeline from the execution of the pipeline.

Please don't let the CSV files put you off. The data source and data target can be anything, indeed you'll be able to implement your own data sources and targets just by providing a custom function that returns an object in a conventional format, so you'll be able to have code that looks like this:

const dataSource = createMyDataSource(... inputs ...);
const dataTarget = createMyDataTarget(... inputs ...);
await dataSource.map(...).filter(...).etc(....).toTarget(dataTarget);

A data source is almost like a factory.... when requested (as a pipeline is being executed) the data source will asynchronously load the data stream from some location (file, REST API, memory location, database, or some custom option).

Same with a data target, it's like a factory.... when requested (again as a pipeline is being executed) the data target will asynchronously save the data stream to some location (again a file, REST API, memory location, database, or whatever you want).

@ashleydavis
Copy link
Member Author

By the way this is all just extreme idea generation... so please give me your wildest ideas.

At some point we'll have to come back to reality and try and figure out if we can make all this backward compatible with DF v1.

@rat-matheson
Copy link

rat-matheson commented Feb 19, 2021

gotcha...so in that one example above, you created a pre-constructed pipeline and then 'apply' applied it the the data source you passed in. I think I get the gist of it and had a similar operation in the streaming library I wrote. It might apply here:

interface Stream<T> {
    // typically streaming/pipeline functions
    map<T_MAPPED>((item:T) => D):Stream<T_MAPPED>;
    filter((item:T) => boolean);

    // The juicy bit
    // So the last instance of the stream just prior to the mapStream function gets passed in 
    // as the parameter
    mapStream<T_MAPPED>((stream:Stream<T>) => Stream<T_MAPPED>):Stream<T_MAPPED>;

    ...
}

So in the context of libraries of complex resuable streams/pipelines, the mapStream function was super useful.

Stream.fromJsonArray<Transaction>(path)
    .filter(transaction => transaction.invoicedCompanyId === 'someId')
    .mapStream(MyCustomPipelines.mapToAggregatedTransactions())
    .find(t => ...)
    ...

It was also useful as a baseOperation for implementing higher level operations like join.

I've been thinking about data-forge use cases and using mapStream (or apply(...)) as a way to implement Matrix math. It seems great to me because server side it could use an n-api implementation while browser side could use a browser implementation with absolutely no change in the base pipeline implementation!

Apologies in advance for not having a better use case on hand. As a simplified example, say we want to determine the total cost of goods for each company that sells Acme factory products. Let's say we have two data sets.

DataSet 1: CompanyUnitSales

companyId   productId  unitsSold someOtherCol ...
c1          p1         10
c1          p2         5
c2          p1         14
c2          p2         0
...

DataSet 2: AcmeFactoryProductPrices

productId wholesalePrice someOtherCol ...
p1         10.99
p2         12.50 
...

So in order to do the matrix math, we need to convert to matrices. Matrix1 for DataSet1 is an MxN matrix where M is the company index and N is the product index

[
10   5
14   0
]

Matrix2 for DataSet2 is just a Nx1 vector giving the price for each N product (where the row index maps to the product id)

[
2.00
3.00
]

so Matrix1 x Matrix2 =

[  35
   28  ]

And we see company1 sells $35 with of products and company2 sells $28 worth.

Kind of a silly example but played out on the pipeline, it might look like

let productPricesMatrix = DataForge.fromJsonFile<ProductPrice>(path)
    .mapPipeline(MatrixOps.toSummaryMatrix({
        expandRowIndices:'productId',
        cellValue: 'wholesalePrice',
        // expandColumnIndices: undefined value just inserts a default column
    }))

let companyUnitSales = DataForge.fromJsonFile<CompanyUnitSale>(path2);

companyUnitSales.
    .mapPipeline(MatrixOps.toSummaryMatrix({
        expandRowIndices:'companyId',
        expandColumnIndices: 'productId',
        cellValue:'unitsSold'
    }))

    // do optimized matrix math if loading the n-api version of MatrixOps
    .mapPipeline(MatrixOps.crossProductWith(productPricesMatrix))

    //now back to our good old pipeline operations
    .filter(t => ...)
    .toTarget(Targets.createFile('/someFile.json', Format.json))

@ashleydavis
Copy link
Member Author

Cool example

@rat-matheson
Copy link

rat-matheson commented Feb 20, 2021

Would it be helpful to put together a small set of use cases? It could be a good starting point for creating some behavioral tests to help guide/justify the v2 api. I like where I think you are going and perhaps these types of tests could show it working in the wild (even if they all fail due to lack of an implementation).

If this is helpful, is there a particular format you would like? If you have a domain suggestion, and an approach, I can contribute.

@ashleydavis
Copy link
Member Author

No particular format required, just keep writing down your ideas here. It all helps piece this puzzle together ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
version-2 Saving this for Data-Forge version 2.
Projects
None yet
Development

No branches or pull requests

2 participants