Skip to content

ReqMgr2 MicroService Transferor

Alan Malta Rodrigues edited this page Aug 18, 2020 · 23 revisions

This documentation is meant to describe the architecture, behaviour and APIs for the ReqMgr2 Microservice Transferor module, which is responsible for executing all the necessary input data placement before an workflow can get acquired by global workqueue and the underlying agents.

For the record - given that those abbreviations will be mentioned several times in this document - here is their meaning: GQE: global workqueue element LQE: local workqueue element

Architecture proposal based on request status in ReqMgr2

As previously discussed, we decided not to rely on GQE information in order to create transfer requests because it would be too heavy on CouchDB and thus it wouldn't deliver a good performance.

This ReqMgr2-based model though, assumes that the MicroService would have to parse the request spec and find out what exactly are the input datasets/blocks, pileup and parent data. It also assumes there would be another (micro)service monitoring those transfer requests and driving work acquisition.

Here is how we envision it to work:

  1. Unified assigns a workflow (request transition from assignment-approved to assigned)
  2. MS Transferor queries for requests in assigned, parses their spec in order to find whether there are any input data that needs to be placed (contacting DBS to get the final list of blocks to be transferred). A given request might have the following input data type:
  • zero or one input primary dataset
  • zero or one parent dataset
  • zero to two pileup datasets
  1. With the overall list of data to replicate, create transfer requests based on:
  • SiteWhitelist used during the workflow assignment
  • campaign configuration
  • unified configuration
  • estimated amount of work
  • anything else from the "black box logic"
  1. For each workflow, we need to persist the transfer request IDs - and some extra information - in CouchDB. See this gist for details on the document schema: https://gist.github.com/amaltaro/72599f995b37a6e33566f3c749143154#file-transferor_format-json-L32
  2. Once all transfer requests were successfully made, update the request status assigned -> staging
  • if there is nothing to be transferred (no input at all), then update the request status once again staging -> staged

After this stage, we'd have to rely on the MSMonitor thread to monitor the status of the transfer requests and take actions on the requests when needed. For a full description of the MSMonitor component, please refer to: https://github.com/dmwm/WMCore/wiki/ReqMgr2-MicroService-Monitor

ReqMgr2 MicroService APIs

The MicroService is a data-service which provides set of APIs to perform certain actions. Its general architecture is shown below: MicroServiceArchitecture

In particular the WMCore MicroService provides an interface to perform Unified actions, such as fetch requests from ReqMgr2 data-services, obtain necessary informations for data placement and place requests of assigned workflows into data placement system PhEDEx.

Available APIs

GET APIs

  • /ms-transferor/data/status provides basic information about MicroService. It returns the following information:
{"result": [
 {"microservice_version": "1.3.0.pre3", "microservice": "MSManager", "wmcore_version": "1.3.0.pre3", "status": "OK"}
]}
  • /ms-transferor/data/status?detail=True provides detailed information about requests in MicroService. It returns the following information:
curl --cert $X509_USER_CERT --key $X509_USER_KEY -X GET -H "Content-type: application/json" https://cmsweb-testbed.cern.ch/ms-transferor/data/status?detail=True

{"result": [
 {"microservice_version": "1.3.0.pre3", "status": "OK", "execution_time": 46.331639, "failed_request_transition": 1, "start_time": "Wed, 22 Jan 2020 12:12:42 UTC", "num_datasets_subscribed": 0, "wmcore_version": "1.3.0.pre3", "nodes_out_of_space": ["T2_GR_Ioannina", "T2_CH_CERNBOX", "T2_UK_SGrid_Bristol", "T2_BR_SPRACE", "T2_CH_CERN", "T2_KR_KNU", "T2_DE_DESY", "T2_RU_SINP", "T2_TH_CUNSTDA", "T2_US_Caltech", "T1_IT_CNAF_Disk", "T2_RU_ITEP", "T2_RU_PNPI", "T2_US_Purdue", "T2_US_Wisconsin", "T2_US_UCSD", "T2_US_MIT", "T2_MY_UPM_BIRUNI", "T1_UK_RAL_Disk", "T1_US_FNAL_Disk"], "success_request_transition": 0, "error": "", "microservice": "MSManager", "total_num_requests": 1, "total_num_campaigns": 139, "num_blocks_subscribed": 0, "end_time": "Wed, 22 Jan 2020 12:13:28 UTC"}
]}

POST APIs (NEEDS REVIEW!!!)

  • /ms-transferor/data allows to send specific request to MicroService

post request to process some state

curl -X POST -H "Content-type: application/json" -d '{"request":{"process":"assignment-approved"}}' http://localhost:8822/ms-transferor/data

obtain results about specific workflow

curl --cert $X509_USER_CERT --key $X509_USER_KEY -X POST -H "Content-type: application/json" -d '{"request":{"task":"amaltaro_StepChain_DupOutMod_Mar2019_Validation_190322_105219_7255"}}' https://cmsweb-testbed.cern.ch/ms-transferor/data

{"result": [
 {"amaltaro_StepChain_DupOutMod_Mar2019_Validation_190322_105219_7255": {"completed": 100}}
]}

Data placement model

This section is meant to describe how the Workload Management works with data, what type of workflows and their input data type are handled in the WM and how to take the most advantage from a smart data placement.

The WMCore Workload Management system has two work granularities:

  • First it starts with chunks of work, which gets created by Global WorkQueue, and each input block corresponds to a chunk of workflow that can get acquired by any WMAgent belonging to the same target team name.
    • Exception for a Monte Carlo from scratch, where there is no input data and the chunk of work corresponds to a specific amount of jobs
  • the second granularity level is the grid job itself, which is associated to a higher level workqueue element (or input block), where each workqueue element can spawn 1 to many grid jobs.

Input data classification

A workflow can have multiple different input data types, or no input at all. The possibilities are:

  1. no input data at all: which means the workflow starts from the event generation step;
  2. primary input data: it's the signal to be processed in the workflow and it's described through the InputDataset parameter in the workflow (which can be either at the top level workflow description or in the first task/step description); each workflow can have 0 or 1 primary input dataset.
  3. secondary input data: it corresponds to the pileup dataset to be used in the workflow and it's described normally through the MCPileup parameter, even though it can also be set via DataPileup (these parameters can be either at the top level workflow description, or inside any task/step dictionary); each workflow can have from 0 to a few pileup datasets (likely not beyond 2 pileups).
  4. parent input data: on top of the signal to be processed - which means the workflow has an input primary dataset - the workflow will process events from the parent dataset (the dataset which originated the input dataset). Such workflows are defined through the IncludeParents = true parameter (following the same InputDataset convention). There can't be a parent dataset if an input primary dataset is not defined. Each workflow has either 0 or 1 parent dataset.

Basic rules for data placement

Provided what's been described above, we should chase a block level data placement whenever possible, such that disk space is wisely used and more data can be staged (thus more workflows can get into running). Given that data distribution will follow a block model, we also need to know the total block size (in bytes) such that we can evenly distribute data among different disk resources (block sizes can vary from MB to TB, so we cannot blindly just count the number of blocks in each subscription).

Unified has an extensive logic and monitoring (using data management monitoring sources) of the CMS disk storage and, one should read the Unified code to see the whole logic that got implemented. However, there are some basic rules that we can already describe here, such as:

  • premix pileup there are usually only a few of those and they are very large secondary input datasets, commonly staged in full at the same location (actually it's made 2 copies of it, one in US and the other in Europe), given that such workflows have a low I/O requirements, jobs are set to read data remotely and the workflow usually runs at many different sites. Usually there is no data placement to be performed, because the dataset is already available a couple of sites. However, if needed, the whole dataset will get subscribed to a given location.
  • classical pileup there are many of such secondary input datasets and of varied data sizes; given that such workflows require a high I/O, jobs have to be sent where the data is located in order to decrease WAN bandwidth needs. Initially we will be working with dataset level subscriptions. However, if the dataset is too large, we might consider distributing a fraction of it at block level, but that option needs to be carefully analysed to make sure there is a good distribution of pileup events compared to the input signal. Further details can be checked on the Unified source code.
  • primary + pileup data many workflows require both of these input data types. For such cases, we need to make sure that the primary/parent input blocks are made available at the same locations holding the pileup dataset in full. Exceptions apply for AAA flags though, where either the primary/parent or the pileup dataset could be read remotely.
  • primary data is the main input dataset to be processed and jobs can either read them locally or remotely. Its data placement is made at block level because there are usually many sites processing the same workflow (exception might go to relvals), such that data and jobs get evenly distributed between sites (well, maybe normalised according to their cpus/storage).
  • parent data is rarely used, but there are workflows that require both input primary data and its parent. In such cases, we need to make sure that a given input block (and ALL its parent blocks) are placed at the same storage, otherwise work acquisition gets stuck and no jobs get created for it. In short, it follows the same block level subscription as the primary input.

Input data placement logic with Rucio

  1. Using DBS, perform the data discovery (which rucio dataset have to be processed by the workflow);
  2. OPTION-A) if possible, retrieve all the rules created for the rucio container and dataset in a single query; OPTION-B) otherwise, retrieve all the rules created for each rucio dataset (one call per rucio dataset)
    1. skip rucio datasets that are not needed by the workflow
    2. check if the rucio rule belongs to the account "wmcore_transferor" and if the RSE expression matches our SiteWhitelist (how to evaluate the RSE expr?)
    3. if there is a rule, reuse it and persist this rule for that given workflow (thus, will be evaluated by MSMonitor)
    4. if there is NO rule, then keep this rucio dataset in the queue to have a rule created
    5. do we make a rule for every rucio dataset? or could we distribute many rucio datasets - against the same RSE - in the same rule?

NOTE: pileup samples are placed as a whole at the same location. Said that, proceed as

  1. retrieve all the rules for a rucio container
    1. check if the rucio rule belongs to the account "wmcore_transferor" and if the RSE expression matches our SiteWhitelist (how to evaluate the RSE expr?)
    2. if there is a rule, reuse it and persist this rule for that given workflow (thus, will be evaluated by MSMonitor)
    3. if there is NO rule, then keep this rucio container in the queue to have a rule created
  2. if the workflow requires input and pileup dataset, those need to target the same RSE (thus no wildcard allowed)
  3. OPTION-A) if the workflow requires only input dataset - AND the whole dataset - then we could have a single rule with grouping=DATASET and an RSE expression covering all sites in the SiteWhitelist (how to do that?); OPTION-B) if the workflow requires only input dataset - AND only some blocks of it - we could either:
    1. make a single rule for a list of rucio datasets, provide all the RSEs matching the SiteWhitelist
    2. make multiple rules - one rule for each rucio dataset - if possible providing all the RSEs matching the SiteWhitelist
  4. if the workflow requires only pileup dataset, we can make a single rule for the rucio container providing all the RSEs matching the SiteWhitelist

Ideas for translation to Rucio

Premix pileup: Rucio can easily be made to have the concept of countries or regions. We'd need two rules to place premixed pileup: One rule to place a full copy at FNAL and one rule to place a copy in "Europe && Tier1". By specifying the grouping as "dataset" (CMS block), the Europe copy would be distributed over the European Tier1s. If certain Tier2s wanted to be hosts as well, you can easily add a bit of metadata to the sites like "Reliable sites"

Classic pileup: Lots of options. For instance, placing 5 copies among 10 "reliable sites" would give each site about 1/2 of the pileup dataset and give you 10 sites where you could run workflows reading it. This can be done with a single rule. If you need to surge production using this dataset, make a rule requiring 8 copies. Then when you are done running, delete the 2nd rule and it will go back down to 5 copies.

Primary data: This is a direct translation. A Rucio container distributed at the Rucio dataset (CMS block) granularity among the relevant sites or group of sites.

Parent data: This is tricky to get the right parents at the right sites. You could micromanage this on a block by block level with a rule per block. Or you just make sure the parent and child datasets are both present at the same site. You could even make a higher level container which contained the parent and the child and require that that be put completely at a site. That way Rucio could pick the site and by deleting one rule, you clean up.

About the RSE quota report

During every cycle of the MSTransferor, we collect storage information such as:

  • quota for a given group/account
  • data storage committed and used and that summary reported is recorded in the logs, e.g.:
2020-03-26 23:07:33,647:INFO:RSEQuotas: Summary of the current quotas (in TB - Terabytes):
2020-03-26 23:07:33,647:DEBUG:RSEQuotas:   T1_DE_KIT_Disk:		bytes_limit: 2676.00, bytes_used: 3167.57, bytes_remaining: -491.57, quota: 2676.00, quota_avail: -491.57
2020-03-26 23:07:33,648:DEBUG:RSEQuotas:   T1_ES_PIC_Disk:		bytes_limit: 882.00, bytes_used: 2161.76, bytes_remaining: -1279.76, quota: 882.00, quota_avail: -1279.76
...

where:

  • bytes_limit and quota represent the quota for a given group/account, i.e., it's the maximum amount of data that can be placed under that group/account at a specific site.
  • bytes_used represents how much data has already been subscribed and approved for a given group/account (thus, not necessarily all transferred over)
  • bytes_remaining represents how many bytes are free, taken from: quota - bytes_used, that's why it's sometimes negative.
  • quota_avail represents a fraction of the total group/account quota. This is a configurable parameter and it's currently set to 80%.

In short, quota_avail is the metric used to evaluate whether MSTransferor can or cannot put data in a given site.

Injecting Unified campaigns into central CouchDB (plus test campaigns)

In order to properly test MSTransferor and MSMonitor services, we need to inject the test campaigns (and the real campaigns available in the Unified database/MongoDB). Here is what we can do at this very moment, until we have a way to keep unified campaigns in synchronized in central CouchDB.

  1. Log in to vocms0274 as yourself and
cd /data/admin  # or another directory that you like
  1. Download the parse script from WMCore github
curl https://raw.githubusercontent.com/dmwm/WMCore/master/bin/adhoc-scripts/parseUnifiedCampaigns.py > parseUnifiedCampaigns.py
  1. Now get all the campaigns from local MongoDB, parse and make them use the WMCore schema. fout specifies an output file to dump the campaign information in a json format. Run the script as follows:
python parseUnifiedCampaigns.py --dburi=mongodb://localhost:27017 --dbname=unified --dbcoll=campaignsConfiguration --verbose=10 --fout=output.json
  1. Now you need to move the output file (in this example output.json) to a node with the WMCore environment, to be more precise, with the ReqMgrAux library. Once there and with the file in pwd, you can source the WMCore environment and inject all the campaigns plus those hardcoded as test:
source apps/wmagent/etc/profile.d/init.sh 
python parseUnifiedCampaigns.py --fin=output.json --url=https://alancc7-cloud2.cern.ch/reqmgr2 --verbose=10 --testcamp

Note that you need to specify the instance you want to inject these campaigns, in my case it was alancc7-cloud2. In addition to that, you might need to add extra test campaigns to the source code (refer to insertTestCampaigns method).

Clone this wiki locally