-
Notifications
You must be signed in to change notification settings - Fork 108
ReqMgr2 MicroService Transferor
This documentation is meant to describe the architecture, behaviour and APIs for the ReqMgr2 Microservice Transferor module, which is responsible for executing all the necessary input data placement before an workflow can get acquired by global workqueue and the underlying agents.
For the record - given that those abbreviations will be mentioned several times in this document - here is their meaning: GQE: global workqueue element LQE: local workqueue element
As previously discussed, we decided not to rely on GQE information in order to create transfer requests because it would be too heavy on CouchDB and thus it wouldn't deliver a good performance.
This ReqMgr2-based model though, assumes that the MicroService would have to parse the request spec and find out what exactly are the input datasets/blocks, pileup and parent data. It also assumes there would be another (micro)service monitoring those transfer requests and driving work acquisition.
Here is how we envision it to work:
- Unified assigns a workflow (request transition from
assignment-approved
toassigned
) - MS Transferor queries for requests in
assigned
, parses their spec in order to find whether there are any input data that needs to be placed (contacting DBS to get the final list of blocks to be transferred). A given request might have the following input data type:
- zero or one input primary dataset
- zero or one parent dataset
- zero to two pileup datasets
- With the overall list of data to replicate, create transfer requests based on:
- SiteWhitelist used during the workflow assignment
- campaign configuration
- unified configuration
- estimated amount of work
- anything else from the "black box logic"
- For each workflow, we need to persist the transfer request IDs - and some extra information - in CouchDB. See this gist for details on the document schema: https://gist.github.com/amaltaro/72599f995b37a6e33566f3c749143154#file-transferor_format-json-L32
- Once all transfer requests were successfully made, update the request status
assigned -> staging
- if there is nothing to be transferred (no input at all), then update the request status once again
staging -> staged
After this stage, we'd have to rely on the MSMonitor thread to monitor the status of the transfer requests and take actions on the requests when needed. For a full description of the MSMonitor component, please refer to: https://github.com/dmwm/WMCore/wiki/ReqMgr2-MicroService-Monitor
The MicroService is a data-service which provides set of APIs to perform certain actions. Its general architecture is shown below:
In particular the WMCore MicroService provides an interface to perform Unified actions, such as fetch requests from ReqMgr2 data-services, obtain necessary informations for data placement and place requests of assigned workflows into data placement system PhEDEx.
- /ms-transferor/data/status provides basic information about MicroService. It returns the following information:
{"result": [
{"microservice_version": "1.3.0.pre3", "microservice": "MSManager", "wmcore_version": "1.3.0.pre3", "status": "OK"}
]}
- /ms-transferor/data/status?detail=True provides detailed information about requests in MicroService. It returns the following information:
curl --cert $X509_USER_CERT --key $X509_USER_KEY -X GET -H "Content-type: application/json" https://cmsweb-testbed.cern.ch/ms-transferor/data/status?detail=True
{"result": [
{"microservice_version": "1.3.0.pre3", "status": "OK", "execution_time": 46.331639, "failed_request_transition": 1, "start_time": "Wed, 22 Jan 2020 12:12:42 UTC", "num_datasets_subscribed": 0, "wmcore_version": "1.3.0.pre3", "nodes_out_of_space": ["T2_GR_Ioannina", "T2_CH_CERNBOX", "T2_UK_SGrid_Bristol", "T2_BR_SPRACE", "T2_CH_CERN", "T2_KR_KNU", "T2_DE_DESY", "T2_RU_SINP", "T2_TH_CUNSTDA", "T2_US_Caltech", "T1_IT_CNAF_Disk", "T2_RU_ITEP", "T2_RU_PNPI", "T2_US_Purdue", "T2_US_Wisconsin", "T2_US_UCSD", "T2_US_MIT", "T2_MY_UPM_BIRUNI", "T1_UK_RAL_Disk", "T1_US_FNAL_Disk"], "success_request_transition": 0, "error": "", "microservice": "MSManager", "total_num_requests": 1, "total_num_campaigns": 139, "num_blocks_subscribed": 0, "end_time": "Wed, 22 Jan 2020 12:13:28 UTC"}
]}
- /ms-transferor/data allows to send specific request to MicroService
post request to process some state
curl -X POST -H "Content-type: application/json" -d '{"request":{"process":"assignment-approved"}}' http://localhost:8822/ms-transferor/data
obtain results about specific workflow
curl --cert $X509_USER_CERT --key $X509_USER_KEY -X POST -H "Content-type: application/json" -d '{"request":{"task":"amaltaro_StepChain_DupOutMod_Mar2019_Validation_190322_105219_7255"}}' https://cmsweb-testbed.cern.ch/ms-transferor/data
{"result": [
{"amaltaro_StepChain_DupOutMod_Mar2019_Validation_190322_105219_7255": {"completed": 100}}
]}
This section is meant to describe how the Workload Management works with data, what type of workflows and their input data type are handled in the WM and how to take the most advantage from a smart data placement.
The WMCore Workload Management system has two work granularities:
- First it starts with chunks of work, which gets created by Global WorkQueue, and each input block corresponds to a chunk of workflow that can get acquired by any WMAgent belonging to the same target team name.
- Exception for a Monte Carlo from scratch, where there is no input data and the chunk of work corresponds to a specific amount of jobs
- the second granularity level is the grid job itself, which is associated to a higher level workqueue element (or input block), where each workqueue element can spawn 1 to many grid jobs.
A workflow can have multiple different input data types, or no input at all. The possibilities are:
- no input data at all: which means the workflow starts from the event generation step;
-
primary input data: it's the signal to be processed in the workflow and it's described through the
InputDataset
parameter in the workflow (which can be either at the top level workflow description or in the first task/step description); each workflow can have 0 or 1 primary input dataset. -
secondary input data: it corresponds to the pileup dataset to be used in the workflow and it's described normally through the
MCPileup
parameter, even though it can also be set viaDataPileup
(these parameters can be either at the top level workflow description, or inside any task/step dictionary); each workflow can have from 0 to a few pileup datasets (likely not beyond 2 pileups). -
parent input data: on top of the signal to be processed - which means the workflow has an input primary dataset - the workflow will process events from the parent dataset (the dataset which originated the input dataset). Such workflows are defined through the
IncludeParents = true
parameter (following the same InputDataset convention). There can't be a parent dataset if an input primary dataset is not defined. Each workflow has either 0 or 1 parent dataset.
Provided what's been described above, we should chase a block level data placement whenever possible, such that disk space is wisely used and more data can be staged (thus more workflows can get into running). Given that data distribution will follow a block model, we also need to know the total block size (in bytes) such that we can evenly distribute data among different disk resources (block sizes can vary from MB to TB, so we cannot blindly just count the number of blocks in each subscription).
Unified has an extensive logic and monitoring (using data management monitoring sources) of the CMS disk storage and, one should read the Unified code to see the whole logic that got implemented. However, there are some basic rules that we can already describe here, such as:
- premix pileup there are usually only a few of those and they are very large secondary input datasets, commonly staged in full at the same location (actually it's made 2 copies of it, one in US and the other in Europe), given that such workflows have a low I/O requirements, jobs are set to read data remotely and the workflow usually runs at many different sites. Usually there is no data placement to be performed, because the dataset is already available a couple of sites. However, if needed, the whole dataset will get subscribed to a given location.
-
classical pileup there are many of such secondary input datasets and of varied data sizes; given that such workflows require a high I/O, jobs have to be sent where the data is located in order to decrease WAN bandwidth needs. Initially we will be working with
dataset
level subscriptions. However, if the dataset is too large, we might consider distributing a fraction of it atblock
level, but that option needs to be carefully analysed to make sure there is a good distribution of pileup events compared to the input signal. Further details can be checked on the Unified source code. - primary + pileup data many workflows require both of these input data types. For such cases, we need to make sure that the primary/parent input blocks are made available at the same locations holding the pileup dataset in full. Exceptions apply for AAA flags though, where either the primary/parent or the pileup dataset could be read remotely.
-
primary data is the main input dataset to be processed and jobs can either read them locally or remotely. Its data placement is made at
block
level because there are usually many sites processing the same workflow (exception might go to relvals), such that data and jobs get evenly distributed between sites (well, maybe normalised according to their cpus/storage). -
parent data is rarely used, but there are workflows that require both input primary data and its parent. In such cases, we need to make sure that a given input block (and ALL its parent blocks) are placed at the same storage, otherwise work acquisition gets stuck and no jobs get created for it. In short, it follows the same
block
level subscription as the primary input.
Premix pileup: Rucio can easily be made to have the concept of countries or regions. We'd need two rules to place premixed pileup: One rule to place a full copy at FNAL and one rule to place a copy in "Europe && Tier1". By specifying the grouping as "dataset" (CMS block), the Europe copy would be distributed over the European Tier1s. If certain Tier2s wanted to be hosts as well, you can easily add a bit of metadata to the sites like "Reliable sites"
Classic pileup: Lots of options. For instance, placing 5 copies among 10 "reliable sites" would give each site about 1/2 of the pileup dataset and give you 10 sites where you could run workflows reading it. This can be done with a single rule. If you need to surge production using this dataset, make a rule requiring 8 copies. Then when you are done running, delete the 2nd rule and it will go back down to 5 copies.
Primary data: This is a direct translation. A Rucio container distributed at the Rucio dataset (CMS block) granularity among the relevant sites or group of sites.
Parent data: This is tricky to get the right parents at the right sites. You could micromanage this on a block by block level with a rule per block. Or you just make sure the parent and child datasets are both present at the same site. You could even make a higher level container which contained the parent and the child and require that that be put completely at a site. That way Rucio could pick the site and by deleting one rule, you clean up.
During every cycle of the MSTransferor, we collect storage information such as:
- quota for a given group/account
- data storage committed and used and that summary reported is recorded in the logs, e.g.:
2020-03-26 23:07:33,647:INFO:RSEQuotas: Summary of the current quotas (in TB - Terabytes):
2020-03-26 23:07:33,647:DEBUG:RSEQuotas: T1_DE_KIT_Disk: bytes_limit: 2676.00, bytes_used: 3167.57, bytes_remaining: -491.57, quota: 2676.00, quota_avail: -491.57
2020-03-26 23:07:33,648:DEBUG:RSEQuotas: T1_ES_PIC_Disk: bytes_limit: 882.00, bytes_used: 2161.76, bytes_remaining: -1279.76, quota: 882.00, quota_avail: -1279.76
...
where:
-
bytes_limit
andquota
represent the quota for a given group/account, i.e., it's the maximum amount of data that can be placed under that group/account at a specific site. -
bytes_used
represents how much data has already been subscribed and approved for a given group/account (thus, not necessarily all transferred over) -
bytes_remaining
represents how many bytes are free, taken from:quota - bytes_used
, that's why it's sometimes negative. -
quota_avail
represents a fraction of the total group/account quota. This is a configurable parameter and it's currently set to 80%.
In short, quota_avail
is the metric used to evaluate whether MSTransferor can or cannot put data in a given site.
In order to properly test MSTransferor and MSMonitor services, we need to inject the test campaigns (and the real campaigns available in the Unified database/MongoDB). Here is what we can do at this very moment, until we have a way to keep unified campaigns in synchronized in central CouchDB.
- Log in to vocms0274 as yourself and
cd /data/admin # or another directory that you like
- Download the parse script from WMCore github
curl https://raw.githubusercontent.com/dmwm/WMCore/master/bin/adhoc-scripts/parseUnifiedCampaigns.py > parseUnifiedCampaigns.py
- Now get all the campaigns from local MongoDB, parse and make them use the WMCore schema.
fout
specifies an output file to dump the campaign information in a json format. Run the script as follows:
python parseUnifiedCampaigns.py --dburi=mongodb://localhost:27017 --dbname=unified --dbcoll=campaignsConfiguration --verbose=10 --fout=output.json
- Now you need to move the output file (in this example
output.json
) to a node with the WMCore environment, to be more precise, with the ReqMgrAux library. Once there and with the file in pwd, you can source the WMCore environment and inject all the campaigns plus those hardcoded as test:
source apps/wmagent/etc/profile.d/init.sh
python parseUnifiedCampaigns.py --fin=output.json --url=https://alancc7-cloud2.cern.ch/reqmgr2 --verbose=10 --testcamp
Note that you need to specify the instance you want to inject these campaigns, in my case it was alancc7-cloud2
.
In addition to that, you might need to add extra test campaigns to the source code (refer to insertTestCampaigns
method).