Out-of-core function pipelines
PyPlant's goal is to simplify writing data processing pipelines. It helps avoid re-running expensive early stages of the pipeline, when only the later stages have changed.
Given a set of Python functions that consume and produce data, it automatically runs them in a correct order and caches intermediate results. When the pipeline is executed again, only the necessary parts are re-run.
Here is a minimal example. We load a large data array and then want to plot some examples:
from pyplant import *
@ReactorFunc
def load_data(pipe):
data = expensive_operation()
pipe.send('data', data)
@ReactorFunc
def plot_data(pipe, config):
data = yield pipe.receive('data')
plot_examples(data, config.color)
with Plant('/tmp/plant') as plant:
plant.run_reactor(plot_data)
We structure the code as two PyPlant "reactors", so if we change the plotting function or the configuration and re-run, only the plotting will be re-executed, because the data loading part has not changed.
Why another Python pipeline/workflow library? When writing ML-related scripts, it is often a pain that the whole script needs to re-run, when only a small cosmetic change was made. So every time the final plot is tweaked, the data is re-loaded and the model is re-trained.
In fact, this is so common that there is a zoo of various data pipelining libraries out there. However, I still haven't found something that's simple, automatic and can handle large data sizes. Here's a list of "wants" that PyPlant is made to fulfill:
- Simple Quick to learn, no custom language and workflow design programs. Start prototyping right away.
- DRY Function code is metadata. No need to write execution graphs or external metadata. It just works.
- Automatic No need to manually re-run outdated parts.
- Large data Handle data that doesn't fit into memory. Persist between runs.
Install from PyPI
pip install pyplant
or clone directly from Github:
pip install git+https://github.com/gleb-t/pyplant.git
The Plant
object needs to be created in order to run code with PyPlant.
It manages metadata, schedules execution and stores/loads ingredients as needed.
The only required parameter for a plant is plantDir
, which specifies the location where ingredients and plant metadata will be stored.
Usually, each project will have its own unique plantDir
.
The recommended way to create a Plant
is using the with
statement, making sure that the plant will be gracefully terminated:
with Plant('/tmp/pyplant/my-project') as plant:
# Run reactors here.
Reactors are units of execution in PyPlant. To specify that a function is a PyPlant reactor, use the @ReactorFunc
decorator.
Reactors must have one positional argument, through which a Pipework
object will be provided:
@ReactorFunc
def plot_data(pipe: Pipework):
# ...
Pipework
is a small object that is used to send and receive ingredients (more below). To run the reactor, construct a plant and call
plant.run_reactor(plot_data)
Most of the time you should run the reactors that produce the final artifacts you're interested in (images, metrics, etc.) and let PyPlant decide when to run the rest of the pipeline.
Technically, before running any reactors, all reactors need to be added to the plant.
However, all @ReactorFunc
functions are automatically registered when you run the first reactor.
If this is not desired (e.g. you manage several different plants), add the reactors explicitly before running any:
plant.add_reactor([load_data, plot_data])
Ingredients are the data/objects that are sent and received by the reactors.
PyPlant tracks ingredient dependencies to schedule reactor execution.
To receive an ingredient use ingredient = yield pipe.receive('name')
.
The yield keyword is there to pause the reactor execution in case the ingredient is not yet ready.
When an ingredient is sent to the plant, note that an ingredient type can be provided, affecting how it will be serialized and stored:
pipe.send('name', ingredient, Ingredient.Type.object)
.
PyPlant will infer the appropriate type in most cases, but you can manually set if necessary.
Here are the possible types:
simple
Primitive Python types:int
,str
, etc.list
Python list of primitive type objects.object
Any Python object that can be pickled.array
NumPy array.file
File stored in the plant dir, passed as a path. Needs to be allocated.
Several additional types are available as optional extensions:
specs.HdfArraySpec
HDF array. Needs to be allocated. H5Py needs to be installed.specs.KerasModelSpec
Keras model, written usingsave_model()
. Keras needs to be installed.specs.ScipySparseSpec
Sparse Scipy matrix, similar to NumPy arrays. SciPy needs to be installed.
These 'ingredient specs' need to be registered with the plant before they could be used:
with Plant('/tmp/pyplant/my-project') as plant:
plant.warehouse.register_ingredient_specs([specs.HdfArraySpec(), specs.ScipySparseSpec()])
When sending, the ingredient type can be specified by passing the spec class type (not required, type can be inferred):
pipe.send('my-hdf-array', array, specs.HdfArraySpec)
Some ingredient types need to be allocated first, because PyPlant manages their creation. Additional arguments can be provided, depending on the type. Here is an example for an HDF-array:
hdf_array = pipe.allocate('hdf-array', specs.HdfArraySpec,
shape=(500, 1000, 1000), dtype=np.float32)
PyPlant can help manage project configuration and track reactor dependencies on configuration parameters.
So, when a parameter is changed, only the affected reactors will be re-ran and not the whole plant.
The configuration can be either a dict
or a ConfigBase
-derived object.
It should be provided to the plant before running the reactors:
plant.set_config(config)
When using a dict
config, any reactor can read parameters through its Pipework
:
color = pipe.read_config('color')
Using the ConfigBase
object (recommended), the config should be declared as a class inheriting from ConfigBase
:
class MyConfig(ConfigBase):
def __init__(self):
super(self).__init__()
self.color = 'red'
Any reactor can declare a second argument (besides the pipe) where it will receive the config object:
@ReactorFunc
def plot_data(pipe, config):
color = config.color
Upon reading a configuration parameter the plant will automatically register the dependency.
Object-based configurations are superior to dict
because of their cleaner syntax and lack of "magic strings" that hinder code analysis tools.
Sometimes, parameters shouldn't trigger re-execution (e.g. temporary dirs, logging level). This can be done by marking the parameter as auxiliary to the plant or in the configuration constructor:
# In the plant:
plant.mark_params_as_auxiliary(['log-level'])
# ... or in the config constructor:
def __init__(self):
# ...
self.mark_auxiliary(['log-level'])
Subreactors are reusable procedures whose code is also tracked by PyPlant. They can be called from reactors to make sure, that changes to their code trigger re-execution of the dependent reactor. So, project code that is prone to implementation changes (keeping the caller code the same) should be wrapped into subreactors. Furthermore, subreactors are also generators managed by PyPlant and can receive and wait for ingredients, just like reactors.
A subreactor function has to be decorated with @SubreactorFunc
.
To call a subreactor, use the yield from
keyword:
returnValue = yield from my_subreactor(argA, argB)
Here's is a complete example. Notice, that subreactors can be nested:
@SubreactorFunc
def subreactor_a(pipe: Pipework):
return 3 + 10 # Do stuff.
@SubreactorFunc
def subreactor_b(pipe: Pipework):
value = yield from subreactor_a(pipe)
return value
@ReactorFunc
def reactor_a(pipe: Pipework):
value = yield from subreactor_b(pipe)
pipe.send('value', value, Ingredient.Type.simple)