FileFlux is a plugin-based batch file converter with pipeline support.
- Using
pyproject.toml(recommended):python -m pip install -e .
- Or using
requirements.txt:python -m pip install -r requirements.txt
- Run via module:
python -m fileflux --help
- Or (optional) install editable:
python -m pip install -e .fileflux --help
- Built-in:
python -m fileflux list-plugins
- With external plugin dir:
python -m fileflux list-plugins --plugins-dir ./converters
- CSV -> JSON:
python -m fileflux convert ./data/example.csv --pipeline "csv->json" --output-dir ./out
- Convert all JSON files in
./datato CSV:python -m fileflux convert --input ./data --glob "*.json" --pipeline "json->csv" --output-dir ./out
- Use
autoto infer the input format from file extension:python -m fileflux convert --input ./data --glob "*.json" --pipeline "auto->csv" --output-dir ./out
- Multi-step example (requires the relevant plugins):
python -m fileflux convert --input ./data --glob "*.json" --pipeline "auto->csv->xlsx" --output-dir ./out
-
Scenario: Convert JSON lines or JSON arrays into CSV for Excel/BI.
-
Command:
python -m fileflux convert --input ./data --glob "*.json" --pipeline "json->csv" --output-dir ./out
-
Scenario: Convert CSV into JSON.
-
Command:
python -m fileflux convert --input ./data --glob "*.csv" --pipeline "csv->json" --output-dir ./out
- Scenario: You need an
.xlsxfile for sharing. - Converter:
csv->xlsx - Command:
python -m fileflux convert --input ./data --glob "*.csv" --pipeline "csv->xlsx" --output-dir ./out
- Scenario: Convert plain CSV to Parquet (choose compression explicitly).
- Converters:
csv->parquet_snappycsv->parquet_zstd
- Commands:
python -m fileflux convert --input ./data --glob "*.csv" --pipeline "csv->parquet_snappy" --output-dir ./outpython -m fileflux convert --input ./data --glob "*.csv" --pipeline "csv->parquet_zstd" --output-dir ./out
- Scenario: Your input is gzipped CSV.
- Converters:
csv.gz->parquet_snappycsv.gz->parquet_zstd
- Commands:
python -m fileflux convert --input ./data --glob "*.csv.gz" --pipeline "csv.gz->parquet_snappy" --output-dir ./outpython -m fileflux convert --input ./data --glob "*.csv.gz" --pipeline "csv.gz->parquet_zstd" --output-dir ./out
- Scenario: You already have Parquet, but want a different codec.
- Converters:
parquet->parquet_snappyparquet->parquet_zstd
- Commands:
python -m fileflux convert --input ./data --glob "*.parquet" --pipeline "parquet->parquet_snappy" --output-dir ./outpython -m fileflux convert --input ./data --glob "*.parquet" --pipeline "parquet->parquet_zstd" --output-dir ./out
- Scenario: Your CSV uses
|or\t. - Command:
python -m fileflux convert --input ./data --glob "*.csv" --pipeline "csv->json" --csv-delimiter "\\t" --output-dir ./out
The built-in csv->xlsx converter uses openpyxl.
- Install:
python -m pip install openpyxl
The built-in csv->parquet converter uses pyarrow (snappy compression by default).
- Install:
python -m pip install pyarrow
There are two separate converters (so you can choose compression in the pipeline):
csv.gz->parquet_snappy(writes.parquetwith snappy)csv.gz->parquet_zstd(writes.parquetwith zstd)
If your input is plain CSV (not gzipped), use:
csv->parquet_snappycsv->parquet_zstd
If your input is already Parquet, you can recompress it:
parquet->parquet_snappyparquet->parquet_zstd
If you want batch conversion fully in GCS with partitioned output (Hive-style, Iceberg/BigLake-friendly),
use the gcs-parquetds command.
- Example (CSV.GZ -> partitioned Parquet dataset with ZSTD):
python -m fileflux gcs-parquetds --input gs://YOUR_BUCKET/in/ --glob "*.csv.gz" --recursive --output gs://YOUR_BUCKET/out/my_table/ --partition-by "region,date" --parquet-compression zstd
This uses Application Default Credentials (ADC). Make sure you’ve authenticated, e.g.:
gcloud auth application-default login
If you want a real Iceberg table (Spark-managed metadata on GCS), use the Dataproc job generator:
- Generate commands:
python -m fileflux dataproc-iceberg --project YOUR_PROJECT --region YOUR_REGION --cluster YOUR_CLUSTER --input "gs://YOUR_BUCKET/in/*.csv.gz" --output-mode iceberg --iceberg-catalog-type hadoop --iceberg-warehouse "gs://YOUR_BUCKET/warehouse" --iceberg-table "YOUR_DATASET.my_iceberg_table" --mode overwrite
Then copy/paste the printed gcloud dataproc jobs submit ... command.
If Spark created the Iceberg table on GCS, BigQuery will only “see” it after you register it as an external ICEBERG table.
For BigQuery, use the Iceberg metadata file (.../metadata/v1.metadata.json) as the URI.
Example:
CREATE OR REPLACE EXTERNAL TABLE `YOUR_PROJECT.YOUR_DATASET.my_biglake_iceberg`
WITH CONNECTION `YOUR_PROJECT.YOUR_REGION.YOUR_CONNECTION`
OPTIONS (
format = 'ICEBERG',
uris = ['gs://YOUR_BUCKET/warehouse/YOUR_DATASET/my_iceberg_table/metadata/v1.metadata.json']
);Provide a directory of *.py plugin files via --plugins-dir.
from pathlib import Path
from fileflux.converters.base import Converter, ConverterSpec
class MyConverter(Converter):
@property
def spec(self) -> ConverterSpec:
return ConverterSpec(
input_format="xml",
output_format="json",
input_extensions=(".xml",),
output_extension=".json",
)
def convert_file(self, input_path: Path, output_path: Path) -> None:
# read input_path, write output_path
...
def register():
return [MyConverter()]