-
Notifications
You must be signed in to change notification settings - Fork 30
Workflow examples
The following workflow performs a streaming Map/Reduce analysis of tweets about Krakow. It demonstrates an important feature of the HyperFlow workflow model: operation as a continuous process network. Every process works according to the following cycle:
- Awaiting signals. Signals received by a proces are queued on its input ports.
- Invoking the function (firing). If a firing condition is fulfilled, i.e. the set of signals required for firing of the process has been received, these signals are consumed (removed from the queues) and the function of the process is invoked.
- Emitting output signals. Outputs returned by the function are emitted as signals. The process is ready for next firing.
The first process of the workflow (TweetSource
) is a source process: it has no input signals, just regularly emits output signals (recent tweets about Krakow). To this end, the process is configured with attribute firingInterval: 15000
which tells the engine to fire the process every 15 seconds. The process is actually defined as dataflow
:
{
"name": "TweetSource",
"type": "dataflow",
"function": "twitterSource",
"firingInterval": 15000,
"ins": [ ],
"outs": [ 0 ]
}
Once fired, the function of the process, using the Twitter search API, retrieves all new tweets posted since the last firing. Next, the tweets are returned as an array of results which will be emitted by the engine as separate signals. In other words, the process can produce zero or more signals in each firing. Let's have a look how this part is implemented in the process's function:
...
twit.get('/search/tweets', options, function(err, data, response) {
// ... code omitted //
data.statuses.forEach(function(t) {
outs[0].data.push(t); // all elements of array 'data' will be emitted as separate signals
});
cb(null, outs);
}
...
The Partitioner
process is only responsible for routing tweets received from the TweetSource
to one of four Mappers
. Partitioner
is defined as a choice
process which copies its input signal to one of the outputs, selected on the basis of the CRC checksum of the tweet's text. This is implemented as follows:
function partitionTweets(ins, outs, executor, config, cb) {
var tweet = ins[0].data[0];
var n = (crc.crc32(tweet.text)>>>0) % (outs.length);
outs[n].condition = "true"; // this tweet will be forwarded to n-th output port (mapper)
outs[n].data = [tweet];
cb(null, outs);
}
The Mappers
parse tweets to retrieve keywords - URIs, user names, hash tags and words - and emit them as pairs (keyword, count)
to the Reducers
. Again, Mappers
are defined as choice
processes in order to partition the keywords among available Reducers
. Similarly, the CRC checksum of the keyword is used to determine to which Reducer
a given pair should be sent.
Finally, the Reducers
aggregate the pairs received from Mappers
(in this case just count the number of keyword occurrences). Though not currently implemented, the Reducers
could update the aggregated statistics in a database.
The implementation is located in the directory examples/TweetAnalysis.
You can run this workflow with the following command:
hflow run examples/TweetAnalysis
In order to run it one needs Twitter API credentials configured in file twitter.conf.json
:
{
"consumer_key": "...",
"consumer_secret": "...",
"access_token_key": "...",
"access_token_secret": "..."
}
This example shows a typical large-scale resource-intensive scientific workflow: Montage. It demonstrates how workflow tasks can be submitted to an external computing infrastructure, in this case a cloud. Montage produces very large mosaic images of the sky. It does that by executing a large pipeline of tasks (mainly image manipulation) whose number, depending on the size of the target image, may easily reach 10,000 as in the example below. In HyperFlow, each Montage task is modeled as a separate dataflow
process which fires only once. All processes invoke the same function: amqp_command
which actually submits a task to a message queue and waits on the same queue for the notification of task completion. The tasks are picked up by Montage workers running on virtual machines deployed on cloud infrastructure.
{
"name": "Montage_10k",
"functions": [ {
"name": "amqp_command",
"module": "functions"
} ],
"processes": [ {
"name": "mProjectPP",
"function": "amqp_command", // sends a task to the cloud via a message queue
"config": { // config object is also passed to the function
"executable": "mProjectPP",
"args": "-X -x 0.93860 2mass-atlas-980529s-j0150174.fits p2mass-atlas-980529s-j0150174.fits region_20090720_143653_22436.hdr"
},
"ins": [ 0, 3 ],
"outs": [ 1, 2 ]
},
// ... about 10k more processes ...
{
"name": "mJPEG", // final workflow task: generation of the final image
"function": "amqp_command",
"config": {
"executable": "mJPEG",
"args": "-ct 1 -gray shrunken_20090720_143653_22436.fits -1.5s 60s gaussian -out shrunken_20090720_143653_22436.jpg"
},
"ins": [ 22960 ],
"outs": [ 22961 ]
} ],
"signals": [ {
"name": "2mass-atlas-980529s-j0150174.fits" // signals represent files and contain only file names
},
// ... about 23k more signals
],
"ins": [ ... ],
"outs": [ ... ]
}
In the workflows
directory there are examples of Montage workflows which only print the commands the would be executed. You can run them as follows (note the necessary -s
flag):
hflow run examples/Montage143 -s
In order to run the Montage workflow with AMQP executor, read this tutorial.
The following workflow creates an image gallery for all images in a given directory tree. To this end, first thumbnails of all images are created, and then a simple HTML page is generated. This is a typical example of batch processing of a collection of files. In this case, the batch jobs are delegated to an external cloud - Google AppEngine.
The first process of the workflow (GetImageList
) searches a directory tree and generates a sequence of signals containing image paths. Next, for every such signal the following sequence of steps is performed:
- the image is uploaded to the AppEngine Blobstore (
UploadImage
); - an Appengine task is created which gets the image from the Blobstore, creates its thumbnail and writes it back to the Blobstore (
ResizeImage
); - the image thumbnail is downloaded to a local disk (
DownloadImage
).
All images are processed in parallel which is achieved using the parlevel
parameter, for example:
{
"name": "UploadImage",
"type": "dataflow",
"function": "uploadImage",
"parlevel": 0,
"ins": [ "image_path" ],
"outs": [ "image_id" ]
}
Normally signals are processed synchronously by a process: they are consumed from input ports, the process invokes the function, waits for the function's callback, emits the output signals, and only then it moves to processing the next set of input signals. The parlevel
parameter changes this behavior to asynchronous: the process does not wait for the callback, but immediately is ready for the next firing as long as there are required signals waiting on input ports. In this way multiple firings of a process can happen in parallel. How many firings are exactly allowed to run concurrently is specified by the value of the parlevel
parameter (0
means no limits, while 1
is equivalent to synchronous processing).
Note that process GenerateHTML
will not be fired until all images have been processed. To this end,
process GetImageList
has the following line: "outs": [ "imagePath:imageCount" ]
, while process GenerateHTML
has one which looks as follows: "ins": [ "pathHtml", "imageAndThumbPath:imageCount" ]
. Note the same tag imageCount
used in both cases. This is the signal quantity tag, which is in fact the name of a count
control signal. This tag/signal relates the count of signal instances produced in a single firing of a process (GetImageList
) with the count of input signal which are required to fire another process (GenerateHTML
).
Using the terminology of workflow patterns, parlevel
enables the Thread split pattern. The inverse counterpart of Thread Split is the Thread Merge pattern: while Thread Split allows one to split up the workflow into multiple parallel tasks, the job of Thread Merge is to wait for these tasks to finish before moving further.
The next workflow performs a 'grep' on a text file: it reads the file line by line, matches each line against a regular expression, and emits the matching lines as results (as well as prints them). Features of the HyperFlow model demonstrated in this example are: control signals and sticky ports.
Control signals, contrary to data signals, are pure, i.e. they do not have 'value', just occur or not. In HyperFlow, every signal with attribute control: true
is considered a control signal. Control signals differ from data signals as follows:
- they are not passed to the function of the process,
- control signals with certain names have special influence on the behavior of most processes.
Currently two names of control signals are distinguished: next and done; their semantics is as follows:
-
next
signal:
- if present as input, it is required for the process to fire (even if all required data signals are ready);
- if present as output, it is emitted after every firing of the process.
-
done
signal:
-
- if present as input, it will cause the process to terminate right after the currently ongoing firing is finished;
- if present as output, it is emitted right before the process terminates.
A useful pattern for the next
signal is to control the pace at which signals are produced so that overflow of signal queues can be avoided. In the 'Grep File' workflow, the signal next
is connected from the output of match to the input of getLine. As a result, successive lines of the file will be produced only after match is ready to process them.
Every input signal of a process can be declared sticky. Note that it is an attribute of a process input port, not the signal itself. Signals arriving on sticky ports are processed in a different way than other signals:
- When a signal is consumed from a sticky port, it is not removed from the input port (it can be consumed again immediately);
- When a signal arrives at a sticky port, it is not queued; instead, it replaces the current signal.
Sticky ports are useful for such inputs of a process that do not change during the process' lifetime (or change only occasionally). This is the case with the regular expression in the 'Grep File' workflow, therefore the regexp
signal is declared as sticky for the match
process. Consequently, it conveniently can be sent only once to the match
process.