Now HTTPX’s async capabilities and Pydantic’s class building took this project over the top. By harnessing their abilities I shifted the codebase from data mapper to pipeline orchestrator. I added every format I could that seemed to have an established Python library. Right now I believe I support : JSON, NDJSON, XML, CSV, TSV, PSV, SQLite, and HTML out of the box. Optional extras (~30 MB pyarrow) unlock Parquet, Feather, ORC; Avro and XLSX have their own extras. I also added every compression I could find. Benchmarks at least for a windows machine are on par with other elt packages.
By focusing on function wrappers to make the developer’s syntax as easy as possible for the original data mapping calls, I established simple automated pipelines with one cli command and one JSON reference file. The JSON is basically the same syntax you would use in Python.
Both stream and fjord accept inflow and outflow Python code. Inflow code allows you to set custom conversion functions and mappings for the incoming data. The outflow code allows you to manipulate the exporting data into a new object new entirely.
Also, because your pipeline is basically created by a JSON file. You should eventually be able to automate the creation of the entire pipeline. Enjoy.
https://github.com/PyPlumber/Incorporator/
How you use it: Declare a subclass with no fields, point it at a URL, and it infers a Pydantic model from the response at runtime — with full strict typing, dot-notation, and an optional registry lookup by any key. class Launch(Incorporator): pass launches = await Launch.incorp( inc_url="https://ll.thespacedevs.com/2.2.0/launch/upcoming/" )
These functions handle the rest of your data mapping and export format needs: - test() lets the framework write the call kwargs for you - refresh() re-fetches with the seed call's params auto-replayed - export() serialises to any of the 13 formats
Then these functions create a pipeline. - stream() runs a chunked daemon with bounded memory. Can be used in two modes: pass-through or stateful (in RAM) updates to be manipulated in real-time. - fjord() fans out N sources and fuses them through a user reducer. This accepts multiple sources and exports.
After that all works copy the parameters into pipeline.json and the command can be as simple: incorporator validate pipeline.json incorporator fjord pipeline.json –logs
PyPlumber•10m ago
Appendices have more advanced examples. There's a fantasy racing league example with 6-api calls & 1-file source with 3 outflows all in the form of an automated fjord pipeline cli call.