Stream-like file processing for PHP, inspired by Gulp.
Pulp moves File objects through a chain of handlers. Source handlers create files, transform handlers edit or replace them, branching handlers fan work out, and destination/result handlers write or collect the final output.
- Pipeline API: Compose file processing as
Pulp::start()->pipe(...)->run(). - Virtual files: Handlers pass
OpenMapsight\pulp\Fileobjects withfileName,srcFileName, and dynamic metadata. - Lazy file content: Files from disk are not read until
$file->contentis accessed. - Stream access: Large-file handlers can call
$file->stream()to read without loading full content. - Branching: Use
split,merge,shadow, andfileSwitchfor common flow-control patterns. - Extensible: Implement handlers directly or build package-level helpers around them.
use OpenMapsight\Pulp;
use OpenMapsight\pulp\File;
Pulp::start()
->pipe(Pulp::src('.*\.txt', __DIR__ . '/input'))
->pipe(Pulp::map(static function (File $file): File {
$file->content = strtoupper($file->content);
return $file;
}))
->pipe(Pulp::dest(__DIR__ . '/output'))
->run();A pipeline is a chain of handlers:
Pulp::start()
->pipe(Pulp::src('.*\.json', __DIR__ . '/data'))
->pipe(/* handler */)
->pipe(Pulp::dest(__DIR__ . '/result'))
->run();run() starts the chain. If no source handler is present, you can pass files directly:
$file = new File('example.txt');
$file->content = 'Hello';
$results = Pulp::start()
->pipe(Pulp::map(static fn(File $file): File => $file))
->run($file);OpenMapsight\pulp\File represents a virtual file.
$file->fileName: the pipeline-relative file name.$file->srcFileName: the original source name/path.$file->content: dynamic content property. For files sourced from disk, this lazy-loads the full file on first access.$file->stream(): returns a readable stream. For path-backed files this opens the source file directly; for generated string content it creates a temporary stream.- Additional properties can be attached dynamically, for example
$file->statsor$file->isLogicallyEmpty.
Use $file->content for normal transforms. Use $file->stream() in handlers that need to process large files without loading them completely.
Creates an empty pipeline.
$pulp = Pulp::start();Creates files from matching paths.
$patterns: a string, array of strings, file path, orFile.$directory: base directory for recursive matching.
Patterns are regular expressions matched against relative file names.
Pulp::start()
->pipe(Pulp::src('.*\.csv', __DIR__ . '/data'))
->run();Creates one file from a path. If $aliasFileName is provided, it becomes the virtual fileName.
Creates one file from an HTTP response body.
Writes incoming files into $directory using each file's fileName.
->pipe(Pulp::dest(__DIR__ . '/result'))Common options:
flush: callfflush()after writing.skipExceptions: log write errors and continue.logSkipExceptions:stderr,stdout, orfalse.
Transforms or drops files.
->pipe(Pulp::map(static function (File $file): ?File {
if ($file->fileName === 'skip.txt') {
return null;
}
$file->content .= "\n";
return $file;
}))Keeps only files where the callback returns truthy.
->pipe(Pulp::filter(static fn(File $file): bool => str_ends_with($file->fileName, '.json')))Collects all files that reach this handler and calls the callback at the end.
->pipe(Pulp::results(static function (array $files): void {
// inspect results
}))Feeds the same incoming files into multiple branch pipelines and merges each branch's results back into the main pipeline.
Use this when one source should produce several outputs.
Pulp::start()
->pipe(Pulp::src('.*\.txt', __DIR__ . '/input'))
->pipe(Pulp::split(
static fn(Pulp $p): Pulp => $p->pipe(Pulp::map(static function (File $file): File {
$file->fileName = 'upper-' . $file->fileName;
$file->content = strtoupper($file->content);
return $file;
})),
static fn(Pulp $p): Pulp => $p->pipe(Pulp::map(static function (File $file): File {
$file->fileName = 'lower-' . $file->fileName;
$file->content = strtolower($file->content);
return $file;
})),
))
->pipe(Pulp::dest(__DIR__ . '/result'))
->run();Branches may be Pulp instances or callbacks receiving a new Pulp instance.
Runs independent pipelines and merges their output.
$a = Pulp::start()->pipe(Pulp::src('.*\.json', __DIR__ . '/a'));
$b = Pulp::start()->pipe(Pulp::src('.*\.json', __DIR__ . '/b'));
Pulp::start()
->pipe(Pulp::merge($a, $b))
->pipe(Pulp::dest(__DIR__ . '/result'))
->run();Use merge for independent sources. Use split when you already have one incoming stream and want multiple outputs from it.
Taps the stream into a side pipeline without changing the main stream.
->pipe(Pulp::shadow(static fn(Pulp $p): Pulp => $p
->pipe(Pulp::debug())
))shadow is useful for diagnostics or side effects. Its branch results are not merged back. Use split if branch output should continue downstream.
Routes files into different sub-pipelines by file name.
->pipe(Pulp::fileSwitch([
'.*\.json' => static fn(Pulp $p): Pulp => $p->pipe(/* JSON handlers */),
'.*\.xml' => static fn(Pulp $p): Pulp => $p->pipe(/* XML handlers */),
], static fn(Pulp $p): Pulp => $p))Prints a short preview for each file.
Deletes each file's srcFileName from disk.
Converts string content between ISO-8859-1 and UTF-8.
Sends file content as an HTTP response.
Pulp::start()
->pipe(Pulp::src('.*\.txt', __DIR__ . '/input'))
->pipe(Pulp::map(static function (File $file): File {
$file->content = trim($file->content) . "\n";
return $file;
}))
->pipe(Pulp::dest(__DIR__ . '/output'))
->run();Pulp::start()
->pipe(Pulp::src('.*\.txt', __DIR__ . '/input'))
->pipe(Pulp::split(
static fn(Pulp $p): Pulp => $p->pipe(/* branch A */),
static fn(Pulp $p): Pulp => $p->pipe(/* branch B */),
))
->pipe(Pulp::dest(__DIR__ . '/output'))
->run();->pipe(Pulp::map(static function (File $file): File {
$stream = $file->stream();
try {
while (($line = fgets($stream)) !== false) {
// Process without loading the full file.
}
} finally {
fclose($stream);
}
return $file;
}))Most handlers extend OpenMapsight\pulp\AbstractHandler.
use OpenMapsight\pulp\AbstractHandler;
use OpenMapsight\pulp\File;
class UppercaseHandler extends AbstractHandler
{
public function onFile(File $file): void
{
$file->content = strtoupper($file->content);
$this->pushFile($file);
}
}Handlers can override:
onStart(): called before the first file.onFile(File $file): called for each file.onEnd(): called when the stream ends.
Use $this->pushFile($file) to pass output to the next handler. A handler may emit zero, one, or many files.
Constructor arguments are declared with getConstructorParamDefs() and accessed via $this->cp.
class PrefixHandler extends AbstractHandler
{
protected function getConstructorParamDefs(): array
{
return ['prefix'];
}
public function onFile(File $file): void
{
$file->content = $this->cp->prefix . $file->content;
$this->pushFile($file);
}
}