Skip to content

ThreadProcessor

Drinking Kazu edited this page Mar 6, 2018 · 4 revisions

Here, we discuss about ThreadProcessor, a critical C++ API for loading larcv data for network training fast. This discussion is divided into 3 portions.

What is ThreadProcessor?

ThreadProcessor is a tool to run multiple ProcessDriver on different threads, and designed specifically for fast data-read to fill a batch of data for training deep neural network. It speeds up data-read via multithreading technique.

ThreadProcessor configuration looks very similar to that of a ProcessDriver. In fact, there are only 2 additional parameters, NumThreads and NumStorage.

  • NumStorage ... defines the size of a circular-buffer (CB). Each element of a CB holds a batch data (batch size set at run-time).
  • NumThreads ... defines number of threads used to fill a CB. Typically set to the same value as NumStorage.

Below is an example taken from here.

    ThreadProcessor: {
      Verbosity:    2
      EnableFilter: false
      RandomAccess: false
      InputFiles: ["copy00.root"]
      ProcessType:  ["BatchFillerImage2D","BatchFillerImage2D"]
      ProcessName:  ["test","label"]
      NumThreads: 6
      NumStorage: 6
      
      ProcessList: {
        test: {
          CaffeMode: false
          Verbosity: 2
          ImageProducer: "data"
          Channels: [0,1,2]
        }
        label: {
          CaffeMode: false
          Verbosity: 2
          ImageProducer: "segment"
          Channels: [0,1,2]
        }
      }
    }

Circular-Buffer and BatchFillerX

Here we discuss a bit details of what is CB and how it is implemented/filled. We start this by introducing a jargon BatchFillerX to represent a set of C++ classes responsible for filling CB.

What is BatchFillerX?

In this document (i.e. not in our code!), BatchFillerX represents a set of C++ classes that inherit from BatchFillerTemplate which is an abstract base class to fill a batch data of type T. A few important facts about BatchFillerX:

  • They are responsible for filling one (and only one = unique) batch data of certain type T.
    • type T could be int16,uint16,int32,uint32,int64,uint64,float32,float64.
  • Their direct base class BatchFillerTemplate implements the mechanism of filling/clearing a batch data.
  • They inherit from ProcessBase and thus it has a unique string name at run-time.
  • They inherit from BatchHolder which overload ProcessBase::is(std::string) function.
    • returns true for is("BatchFiller").
    • ThreadProcessor (and you can) use this to identify which ProcessBase instances are BatchFillerX.

For example, in the configuration script shown above, BatchFillerImage2D is a one kind of BatchFillerX that fills a batch data of float32 from larcv::Image2D data from input larcv file. The assigned batch data can be uniquely determined by the name (e.g. test and label in this case).

Where/What is Circular-Buffer (CB)?

A unique CB is created to each instance of BatchFillerX. All CBs are owned and maintained by BatchDataStorageFactory of type T. Each CB is uniquely identified by BatchFillerX's name, and can be accessed by an attribute function BatchDataStorageFactory::get_storage(std::string name). The size of CB, which can be accessed by BatchDataStorage::num_batch(), is same as what you set by NumStorage configuration parameter as described earlier.

How ThreadProcessor works

Now that you learned about BatchFillerX and CB, we can discuss about how ThreadProcessor works in a big picture. Upon configuration, with parameters NumStorage=M, ThreadProcessor creates:

  • Circular-buffer(s) of size M
  • M instance(s) of ProcessDriver

Each ProcessDriver instance is dedicated to fill a unique element of CBs. Typically we use multiple CBs of size M. For an image-classification example, you want to have a CB for image data and image label separately. In an example configuration script above, there are 2 CBs of the same type (e.g. where type defined by BatchFillerImage2D) but with different names, test and label.

Once configured, ThreadProcessor is ready to run. You can run in 2 ways:

  1. Call ThreadProcessor::batch_process(size_t batch_size).

    • It creates a new thread to fill a CB with entries specified by batch_size value.
    • If the number of already-running threads exceeds N, it won't do anything and returns false.
  2. Call ThreadProcessor::start_manager(size_t batch_size).

    • This instantiates a manager thread which continuously calls ThreadProcessor::batch_process(size_t batch_size).
    • ... and call ThreadProcessor::stop_manager() when finished (at the end of your code)!

As you may guess, you almost always want to use start_manager function. Here are two more important functions you may use frequently:

  • ThreadProcessor::storage_status_array()
    • returns an array of status defined by enum larcv::BatchDataState_t.
    • tells your which batch data is ready to use (i.e. finished being filled).
    • the index of an array corresponds to a particular storage id.
  • ThreadProcessor::release_data(size_t storage_id)
    • flags a particular CB to be available for refilling (i.e. you done using this data).
    • storage_id is 0 or positive integer smaller than M, corresponding to an index of status array.

The GIF animation below is a pictorial description of how ThreadProcessor works. Here, N=3 and M=4.

Python code?

Enough English texts! You must be fed up and just want an example code to run it, right? It's actually pretty simple. Below you find an example of running it and accessing batch data.

Just one last comment before we end with an example: although managing ThreadProcessor directly is possible through standard Python APIs (and it's fairly simple already), there is a separate python wrapper class to further simplify our life. [Read here] for details.

from larcv import larcv

# Literally 3 lines to: 0) instantiate, 1) configure, 2) start running ThreadProcessor
proc = larcv.ThreadProcessor("ThreadProcessor") # This should match with the big configuration block name 
proc.configure("test_image2d.cfg") # Example from larcv/app/ThreadIO/test/test_image2d.cfg
proc.start_manager(10) # Using batch size 10

# Let's create a list of circular-buffer (storage) IDs
storage_ids  = np.arange(0,proc.storage_status_array().size())
# Let's also create a liste of BatchFillerX name & types
filler_names = [proc.storage_name(i) for i in proc.batch_fillers()]
filler_types = [larcv.BatchDataTypeName(proc.batch_types()[i]) for i in proc.batch_fillers()]

# Now infinite loop to keep reading data
while 1:
    status_v = proc.storage_status_array()
    for storage_id in storage_ids:
        status = int(status_v[storage_id])
        if not status == 3:
            continue # Data still being filled for this circular buffer
        print("Buffer ID {:d} filled!".format(storage_id))
        # access data here
        for idx, name in enumerate(filler_names):
            factory = larcv.BatchDataStorageFactory(filler_types[idx]).get()
            data    = factory.get_storage(name).get_batch(storage_id).data()
            print("Data name {:s} type {:s} size {:d}".format(name,filler_types[idx],data.size()))
        # done using data? Release so that thread will re-fill with new data
        proc.release_data(storage_id)

# Don't forget to stop manager & reset (this will ensure all threads to be terminated)
proc.stop_manager()
proc.reset()