-
Notifications
You must be signed in to change notification settings - Fork 16
ThreadProcessor
Here, we discuss about ThreadProcessor
, a critical C++ API for loading larcv
data for network training fast. This discussion is divided into 3 portions.
- What is thread processor?
- CircularBuffer and BatchFillerX ... (what fills what?)
- How does ThreadProcessor work?
- Just give me example! ... we recommend you read all sections :)
ThreadProcessor
is a tool to run multiple ProcessDriver on different threads, and designed specifically for fast data-read to fill a batch of data for training deep neural network. It speeds up data-read via multithreading technique.
ThreadProcessor configuration looks very similar to that of a ProcessDriver. In fact, there are only 2 additional parameters, NumThreads
and NumStorage
.
-
NumStorage
... defines the size of a circular-buffer (CB). Each element of a CB holds a batch data (batch size set at run-time). -
NumThreads
... defines number of threads used to fill a CB. Typically set to the same value asNumStorage
.
Below is an example taken from here.
ThreadProcessor: {
Verbosity: 2
EnableFilter: false
RandomAccess: false
InputFiles: ["copy00.root"]
ProcessType: ["BatchFillerImage2D","BatchFillerImage2D"]
ProcessName: ["test","label"]
NumThreads: 6
NumStorage: 6
ProcessList: {
test: {
CaffeMode: false
Verbosity: 2
ImageProducer: "data"
Channels: [0,1,2]
}
label: {
CaffeMode: false
Verbosity: 2
ImageProducer: "segment"
Channels: [0,1,2]
}
}
}
Here we discuss a bit details of what is CB and how it is implemented/filled. We start this by introducing a jargon BatchFillerX
to represent a set of C++ classes responsible for filling CB.
In this document (i.e. not in our code!), BatchFillerX
represents a set of C++ classes that inherit from BatchFillerTemplate
which is an abstract base class to fill a batch data of type T
. A few important facts about BatchFillerX
:
- They are responsible for filling one (and only one = unique) batch data of certain
type T
.-
type T
could beint16
,uint16
,int32
,uint32
,int64
,uint64
,float32
,float64
.
-
- Their direct base class
BatchFillerTemplate
implements the mechanism of filling/clearing a batch data. - They inherit from
ProcessBase
and thus it has a unique string name at run-time. - They inherit from
BatchHolder
which overloadProcessBase::is(std::string)
function.- returns
true
foris("BatchFiller")
. -
ThreadProcessor
(and you can) use this to identify whichProcessBase
instances areBatchFillerX
.
- returns
For example, in the configuration script shown above, BatchFillerImage2D
is a one kind of BatchFillerX
that fills a batch data of float32
from larcv::Image2D
data from input larcv file. The assigned batch data can be uniquely determined by the name (e.g. test
and label
in this case).
A unique CB is created to each instance of BatchFillerX
. All CBs are owned and maintained by BatchDataStorageFactory
of type T
. Each CB is uniquely identified by BatchFillerX
's name, and can be accessed by an attribute function BatchDataStorageFactory::get_storage(std::string name)
. The size of CB, which can be accessed by BatchDataStorage::num_batch()
, is same as what you set by NumStorage
configuration parameter as described earlier.
Now that you learned about BatchFillerX
and CB, we can discuss about how ThreadProcessor
works in a big picture. Upon configuration, with parameters NumStorage=M
, ThreadProcessor
creates:
- Circular-buffer(s) of size
M
-
M
instance(s) ofProcessDriver
Each ProcessDriver
instance is dedicated to fill a unique element of CBs. Typically we use multiple CBs of size M
. For an image-classification example, you want to have a CB for image data and image label separately. In an example configuration script above, there are 2 CBs of the same type (e.g. where type defined by BatchFillerImage2D
) but with different names, test
and label
.
Once configured, ThreadProcessor
is ready to run. You can run in 2 ways:
-
Call
ThreadProcessor::batch_process(size_t batch_size)
.- It creates a new thread to fill a CB with entries specified by
batch_size
value. - If the number of already-running threads exceeds
N
, it won't do anything and returnsfalse
.
- It creates a new thread to fill a CB with entries specified by
-
Call
ThreadProcessor::start_manager(size_t batch_size)
.- This instantiates a manager thread which continuously calls
ThreadProcessor::batch_process(size_t batch_size)
. - ... and call
ThreadProcessor::stop_manager()
when finished (at the end of your code)!
- This instantiates a manager thread which continuously calls
As you may guess, you almost always want to use start_manager
function. Here are two more important functions you may use frequently:
-
ThreadProcessor::storage_status_array()
- returns an array of status defined by
enum larcv::BatchDataState_t
. - tells your which batch data is ready to use (i.e. finished being filled).
- the index of an array corresponds to a particular
storage id
.
- returns an array of status defined by
-
ThreadProcessor::release_data(size_t storage_id)
- flags a particular CB to be available for refilling (i.e. you done using this data).
-
storage_id
is 0 or positive integer smaller thanM
, corresponding to an index of status array.
The GIF animation below is a pictorial description of how ThreadProcessor works. Here, N=3
and M=4
.
Enough English texts! You must be fed up and just want an example code to run it, right? It's actually pretty simple. Below you find an example of running it and accessing batch data.
Just one last comment before we end with an example: although managing ThreadProcessor
directly is possible through standard Python APIs (and it's fairly simple already), there is a separate python wrapper class to further simplify our life. [Read here] for details.
from larcv import larcv
# Literally 3 lines to: 0) instantiate, 1) configure, 2) start running ThreadProcessor
proc = larcv.ThreadProcessor("ThreadProcessor") # This should match with the big configuration block name
proc.configure("test_image2d.cfg") # Example from larcv/app/ThreadIO/test/test_image2d.cfg
proc.start_manager(10) # Using batch size 10
# Let's create a list of circular-buffer (storage) IDs
storage_ids = np.arange(0,proc.storage_status_array().size())
# Let's also create a liste of BatchFillerX name & types
filler_names = [proc.storage_name(i) for i in proc.batch_fillers()]
filler_types = [larcv.BatchDataTypeName(proc.batch_types()[i]) for i in proc.batch_fillers()]
# Now infinite loop to keep reading data
while 1:
status_v = proc.storage_status_array()
for storage_id in storage_ids:
status = int(status_v[storage_id])
if not status == 3:
continue # Data still being filled for this circular buffer
print("Buffer ID {:d} filled!".format(storage_id))
# access data here
for idx, name in enumerate(filler_names):
factory = larcv.BatchDataStorageFactory(filler_types[idx]).get()
data = factory.get_storage(name).get_batch(storage_id).data()
print("Data name {:s} type {:s} size {:d}".format(name,filler_types[idx],data.size()))
# done using data? Release so that thread will re-fill with new data
proc.release_data(storage_id)
# Don't forget to stop manager & reset (this will ensure all threads to be terminated)
proc.stop_manager()
proc.reset()