diff --git a/.gitignore b/.gitignore index 5965412..1d6117c 100644 --- a/.gitignore +++ b/.gitignore @@ -2,5 +2,7 @@ *.egg-info *.swp # vim swap files +.ipynb_checkpoints + build/ dist/ diff --git a/processMeerKAT/bookkeeping.py b/processMeerKAT/bookkeeping.py index 3f674fd..788c56c 100644 --- a/processMeerKAT/bookkeeping.py +++ b/processMeerKAT/bookkeeping.py @@ -187,6 +187,12 @@ def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,o msmd = msmetadata() qa = quanta() + if ',' in vis: #detect if we are combining tracks + vis = vis.split(',')[0] + combTracks = True + else: + combTracks = False + if os.path.exists('{0}/SUBMSS'.format(vis)): tmpvis = glob.glob('{0}/SUBMSS/*'.format(vis))[0] else: @@ -213,6 +219,9 @@ def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,o target_str = msmd.namesforfields(targetfield)[0] + if combTracks: #currently setting visbase to the CWD in the case of combining tracks + visbase = os.getcwd().split('/')[-1] + if '.ms' in visbase and target_str not in visbase: basename = visbase.replace('.ms','.{0}'.format(target_str)) else: diff --git a/processMeerKAT/default_config_comb.txt b/processMeerKAT/default_config_comb.txt new file mode 100644 index 0000000..a949d5e --- /dev/null +++ b/processMeerKAT/default_config_comb.txt @@ -0,0 +1,97 @@ +[data] +vis = '' + +[fields] +bpassfield = '' +fluxfield = '' +phasecalfield = '' +targetfields = '' +extrafields = '' + +[slurm] # See processMeerKAT.py -h for documentation +nodes = 1 +ntasks_per_node = 8 +plane = 1 +mem = 232 # Use this many GB of memory (per node) +partition = 'Main' # SLURM partition to use +exclude = '' # SLURM nodes to exclude +time = '12:00:00' +submit = False +container = '/idia/software/containers/casa-6.6.0-modular.sif' +mpi_wrapper = 'mpirun' +name = '' +dependencies = '' +account = 'b03-idia-ag' +reservation = '' +modules = ['openmpi/4.0.3'] +verbose = False +precal_scripts = [] +postcal_scripts = [('prepareTracks.py', True, ''), ('science_image.py', False, '')] +scripts = [] + +[crosscal] +minbaselines = 4 # Minimum number of baselines to use while calibrating +chanbin = 1 # Number of channels to average before calibration (during partition) +width = 1 # Number of channels to (further) average after calibration (during split) +timeavg = '8s' # Time interval to average after calibration (during split) +createmms = True # Create MMS (True) or MS (False) for cross-calibration during partition +keepmms = True # Output MMS (True) or MS (False) during split +spw = '*:880~933MHz,*:960~1010MHz,*:1010~1060MHz,*:1060~1110MHz,*:1110~1163MHz,*:1299~1350MHz,*:1350~1400MHz,*:1400~1450MHz,*:1450~1500MHz,*:1500~1524MHz,*:1630~1680MHz' # Spectral window / frequencies to extract for MMS +nspw = 11 # Number of spectral windows to split into +calcrefant = False # Calculate reference antenna in program (overwrites 'refant') +refant = 'm059' # Reference antenna name / number +standard = 'Stevens-Reynolds 2016'# Flux density standard for setjy +badants = [] # List of bad antenna numbers (to flag) +badfreqranges = [ '933~960MHz', # List of bad frequency ranges (to flag) + '1163~1299MHz', + '1524~1630MHz'] + +[selfcal] +nloops = 2 # Number of clean + bdsf loops. +loop = 0 # If nonzero, adds this number to nloops to name images or continue previous run +cell = '1.5arcsec' +robust = -0.5 +imsize = [6144, 6144] +wprojplanes = 512 +niter = [10000, 50000, 50000] +threshold = ['0.5mJy', 10, 10] # After loop 0, S/N values if >= 1.0, otherwise Jy +nterms = 2 # Number of taylor terms +gridder = 'wproject' +deconvolver = 'mtmfs' +calmode = ['','p'] # '' to skip solving (will also exclude mask for this loop), 'p' for phase-only and 'ap' for amplitude and phase +solint = ['','1min'] +uvrange = '' # uv range cutoff for gaincal +flag = True # Flag residual column after selfcal? +gaintype = 'G' # Use 'T' for polarisation on linear feeds (e.g. MeerKAT) +discard_nloops = 0 # Discard this many selfcal solutions (e.g. from quick and dirty image) during subsequent loops (only considers when calmode !='') +outlier_threshold = 0.0 # S/N values if >= 1.0, otherwise Jy +outlier_radius = 0.0 # Radius in degrees for identifying outliers in RACS +method ='default' # Mask creation method for science imaging (i.e. image or cube mask. default = image) + +[image] +cell = '2arcsec' +robust = 0.0 +imsize = [1500, 1500] +wprojplanes = 128 +niter = 50000 +threshold = '0.1mJy' # S/N value if >= 1.0 and rmsmap != '', otherwise Jy +multiscale = [0, 5, 10, 15] +nterms = 2 # Number of taylor terms +gridder = 'wproject' +deconvolver = 'multiscale' +specmode = 'cube' # 'mfs' for continuum image, 'cube' for spectral-line image +uvtaper = '' +restfreq = '1420.406MHz' # HI line (change for other emission lines - e.g. OH) +fitspw = '' # Emission line frequencies to exclude from fit to continuum during uvcontsub, and SPW to select for imaging when specmode='cube' +fitorder = 1 # Order of polynomial fit to continuum during uvcontsub +restoringbeam = '' +stokes = 'I' +pbthreshold = 0.1 # Threshold below which to mask the PB for PB correction +pbband = 'LBand' # Which band to use to generate the PB - one of "LBand" "SBand" or "UHF" +mask = '' +rmsmap = '' +outlierfile = '' + +[run] # Internal variables for pipeline execution +continue = True +dopol = False diff --git a/processMeerKAT/prepareTracks.py b/processMeerKAT/prepareTracks.py new file mode 100644 index 0000000..936a23f --- /dev/null +++ b/processMeerKAT/prepareTracks.py @@ -0,0 +1,78 @@ +import os +import sys +import glob +from shutil import copytree + +import config_parser +from config_parser import validate_args as va +import bookkeeping + +from casatasks import * +logfile=casalog.logfile() +casalog.setlogfile('logs/{SLURM_JOB_NAME}-{SLURM_JOB_ID}.casa'.format(**os.environ)) +import casampi + +from casatools import msmetadata,image +msmd = msmetadata() +ia = image() + +import logging +from time import gmtime +logging.Formatter.converter = gmtime +logger = logging.getLogger(__name__) +logging.basicConfig(format="%(asctime)-15s %(levelname)s: %(message)s", level=logging.INFO) + + +from aux_scripts.concat import check_output, get_infiles +from selfcal_scripts.selfcal_part2 import find_outliers, mask_image + + +def deconvolveMSs(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojplanes, niter, threshold, uvrange, nterms, + gridder, deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, outlier_radius, flag): + + imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,_,_,_,_ = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,outlier_radius,threshold,step='tclean') + calcpsf = True + + vis = vis.split(',') + + tclean(vis=vis, selectdata=False, datacolumn='corrected', imagename=imagename, + imsize=imsize[loop], cell=cell[loop], stokes='I', gridder=gridder[loop], + wprojplanes = wprojplanes[loop], deconvolver = deconvolver[loop], restoration=True, + weighting='briggs', robust = robust[loop], niter=niter[loop], outlierfile=outlierfile, + threshold=threshold[loop], nterms=nterms[loop], calcpsf=calcpsf, # cfcache = cfcache, + pblimit=-1, mask=pixmask, parallel = True) + +def makeMask(params, method='default'): + + if method == 'default': + rmsmap, outlierfile = find_outliers(**params,step='bdsf') + pixmask = mask_image(**params) + + #elif method == "your method": + #space to add different, user-defined methods into the pipeline. e.g., a spectral mask. + + return rmsmap, outlierfile, pixmask + + +if __name__ == '__main__': + + args,params = bookkeeping.get_selfcal_params() + method = params.pop('method')[0] + + ##blind deconvolution + deconvolveMSs(**params) + + #make mask for imaging + rmsmap, outlierfile, pixmask = makeMask(params, method=method) + + loop = params['loop'] + + #update config file for science imaging + if config_parser.has_section(args['config'], 'image'): + if config_parser.get_key(args['config'], 'image', 'specmode') != 'cube': + # Don't copy over continuum mask for cube-mode science imaging; allow for 3D mask (e.g. SoFiA) + config_parser.overwrite_config(args['config'], conf_dict={'mask' : "'{0}'".format(pixmask)}, conf_sec='image') + config_parser.overwrite_config(args['config'], conf_dict={'rmsmap' : "'{0}'".format(rmsmap)}, conf_sec='image') + config_parser.overwrite_config(args['config'], conf_dict={'outlierfile' : "'{0}'".format(outlierfile)}, conf_sec='image') + + bookkeeping.rename_logs(logfile) \ No newline at end of file diff --git a/processMeerKAT/processMeerKAT.py b/processMeerKAT/processMeerKAT.py index 5f58c1a..c106405 100755 --- a/processMeerKAT/processMeerKAT.py +++ b/processMeerKAT/processMeerKAT.py @@ -51,6 +51,7 @@ AUX_SCRIPTS_DIR = 'aux_scripts' SELFCAL_SCRIPTS_DIR = 'selfcal_scripts' CONFIG = 'default_config.txt' +CONFIG_COMB = 'default_config_comb.txt' TMP_CONFIG = '.config.tmp' MASTER_SCRIPT = 'submit_pipeline.sh' SPW_PREFIX = '*:' @@ -66,6 +67,7 @@ MPI_WRAPPER = 'mpirun' PRECAL_SCRIPTS = [('calc_refant.py',False,''),('partition.py',True,'')] #Scripts run before calibration at top level directory when nspw > 1 POSTCAL_SCRIPTS = [('concat.py',False,''),('plotcal_spw.py', False, ''),('selfcal_part1.py',True,''),('selfcal_part2.py',False,''),('science_image.py', True, '')] #Scripts run after calibration at top level directory when nspw > 1 +POSTCAL_SCRIPTS_COMB = [('prepareTracks.py',True,''),('science_image.py',True,'')] SCRIPTS = [ ('validate_input.py',False,''), ('flag_round_1.py',True,''), ('calc_refant.py',False,''), @@ -212,6 +214,7 @@ def parse_scripts(val): #add mutually exclusive group - don't want to build config, run pipeline, or display version at same time run_args = parser.add_mutually_exclusive_group(required=True) run_args.add_argument("-B","--build", action="store_true", required=False, default=False, help="Build config file using input MS.") + run_args.add_argument("-BC","--combtracks", action="store_true",required=False, default=False, help="Build config file to image multiple tracks") run_args.add_argument("-R","--run", action="store_true", required=False, default=False, help="Run pipeline with input config file.") run_args.add_argument("-V","--version", action="store_true", required=False, default=False, help="Display the version of this pipeline and quit.") run_args.add_argument("-L","--license", action="store_true", required=False, default=False, help="Display this program's license and quit.") @@ -221,6 +224,11 @@ def parse_scripts(val): if len(unknown) > 0: parser.error('Unknown input argument(s) present - {0}'.format(unknown)) + #force inclusion of selfcal and imaging parameters when combining tracks + if args.combtracks: + args.do2GC = True + args.science_image = True + if args.run: if args.config is None: parser.error("You must input a config file [--config] to run the pipeline.") @@ -279,8 +287,13 @@ def validate_args(args,config,parser=None): if args['MS'] not in [None,'None'] and not os.path.isdir(args['MS']): msg = "Input MS '{0}' not found.".format(args['MS']) raise_error(config, msg, parser) - - if parser is not None and not args['build'] and args['MS']: + + if parser is None or args['combtracks']: + if args['MS'] is None and not args['nofields']: + msg = "You must input the calibrated MS directories as a single comma-separated string into MS [-M --MS] to build the config file." + raise_error(config, msg, parser) + + if parser is not None and not args['build'] and not args['combtracks'] and args['MS']: msg = "Only input an MS [-M --MS] during [-B --build] step. Otherwise input is ignored." raise_error(config, msg, parser) @@ -923,8 +936,9 @@ def srun(arg_dict,qos=True,time=10,mem=4): return call def write_jobs(config, scripts=[], threadsafe=[], containers=[], num_precal_scripts=0, mpi_wrapper=MPI_WRAPPER, nodes=8, ntasks_per_node=4, mem=MEM_PER_NODE_GB_LIMIT,plane=1, partition='Main', - time='12:00:00', submit=False, name='', verbose=False, quiet=False, dependencies='', exclude='', account='b03-idia-ag', reservation='', modules=[], timestamp='', justrun=False): + time='12:00:00', submit=False, name='', verbose=False, quiet=False, dependencies='', exclude='', account='b03-idia-ag', reservation='', modules=[], timestamp='', justrun=False, combTracks=False): + """Write a series of sbatch job files to calibrate a CASA MeasurementSet. Arguments: @@ -973,8 +987,10 @@ def write_jobs(config, scripts=[], threadsafe=[], containers=[], num_precal_scri Modules to load upon execution of sbatch script. timestamp : str, optional Timestamp to put on this run and related runs in SPW directories. - justrun : bool, optionall - Just run the pipeline without rebuilding each job script (if it exists).""" + justrun : bool, optional + Just run the pipeline without rebuilding each job script (if it exists) + combTracks : bool, optional + Combining multiple tracks.""" kwargs = locals() crosscal_kwargs = get_config_kwargs(config, 'crosscal', CROSSCAL_CONFIG_KEYS) @@ -1093,6 +1109,41 @@ def default_config(arg_dict): logger.info('Config "{0}" generated.'.format(filename)) +def default_config_comb(arg_dict): + + """Generate default config file in current directory, pointing to MS, with fields and SLURM parameters set. + + Arguments: + ---------- + arg_dict : dict + Dictionary of arguments passed into this script, which is inserted into the config file under various sections.""" + + filename = arg_dict['config'] + MS = arg_dict['MS'] + + #Copy default config to current location + copyfile('{0}/{1}'.format(SCRIPT_DIR,CONFIG_COMB),filename) + + #Add SLURM CL arguments to config file under section [slurm] + slurm_dict = get_slurm_dict(arg_dict,SLURM_CONFIG_KEYS) + for key in SLURM_CONFIG_STR_KEYS: + if key in slurm_dict.keys(): slurm_dict[key] = "'{0}'".format(slurm_dict[key]) + + slurm_dict['postcal_scripts'] = POSTCAL_SCRIPTS_COMB + slurm_dict['precal_scripts'] = [] + slurm_dict['scripts'] = [] + arg_dict['scripts'] = [] + + #Overwrite CL parameters in config under section [slurm] + config_parser.overwrite_config(filename, conf_dict=slurm_dict, conf_sec='slurm') + + #Add MS to config file under section [data] and dopol under section [run] + config_parser.overwrite_config(filename, conf_dict={'vis' : "'{0}'".format(MS)}, conf_sec='data') + + config_parser.overwrite_config(filename, conf_dict={'scripts' : arg_dict['scripts']}, conf_sec='slurm') + + logger.info('Config "{0}" generated.'.format(filename)) + def get_slurm_dict(arg_dict,slurm_config_keys): """Build a slurm dictionary to be inserted into config file, using specified keys. @@ -1244,9 +1295,16 @@ def format_args(config,submit,quiet,dependencies,justrun): kwargs['num_precal_scripts'] = len(kwargs['precal_scripts']) - # Validate kwargs along with MS - kwargs['MS'] = data_kwargs['vis'] - validate_args(kwargs,config) + # Validate kwargs along with MS, and validate each MS individually in the case of combining tracks + if ',' in data_kwargs['vis']: #a comma indicates this is a list of MSs + MSs = data_kwargs['vis'].split(',') + for MS in MSs: + kwargs['MS'] = MS + validate_args(kwargs,config) + kwargs['MS'] = MSs + else: + kwargs['MS'] = data_kwargs['vis'] + validate_args(kwargs,config) #Reformat scripts tuple/list, to extract scripts, threadsafe, and containers as parallel lists #Check that path to each script and container exists or is '' @@ -1441,7 +1499,7 @@ def spw_split(spw,nspw,config,mem,badfreqranges,MS,partition,createmms=True,remo #Create each spw as directory and place config in there logger.info("Making {0} directories for SPWs ({1}) and copying '{2}' to each of them.".format(nspw,SPWs,config)) for spw in SPWs: - spw_config = '{0}/{1}'.format(spw.replace(SPW_PREFIX,''),config) + spw_config = '{0}/{1}'.format(spw.replace(SPW_PREFIX,''),config.split('/')[-1]) #fixes a bug where the leading slash causes a file error if not os.path.exists(spw.replace(SPW_PREFIX,'')): os.mkdir(spw.replace(SPW_PREFIX,'')) copyfile(config, spw_config) @@ -1544,6 +1602,8 @@ def main(): logger.info(license) if args.build: default_config(vars(args)) + if args.combtracks: + default_config_comb(vars(args)) if args.run: kwargs = format_args(args.config,args.submit,args.quiet,args.dependencies,args.justrun) write_jobs(args.config, **kwargs) diff --git a/processMeerKAT/science_image.py b/processMeerKAT/science_image.py index 3886f8a..29e8c94 100644 --- a/processMeerKAT/science_image.py +++ b/processMeerKAT/science_image.py @@ -103,7 +103,11 @@ def do_pb_corr(inpimage, pbthreshold=0, pbband='LBand'): def science_image(vis, spw, cell, robust, imsize, wprojplanes, niter, threshold, multiscale, nterms, gridder, deconvolver, specmode, uvtaper, restfreq, restoringbeam, stokes, mask, rmsmap, outlierfile, keepmms, pbthreshold, pbband, fitspw): - visbase = os.path.split(vis.rstrip('/ '))[1] # Get only vis name, not entire path + if ',' in vis: #currently setting visbase to the CWD in the case of combining tracks + vis = vis.split(',') + visbase = os.getcwd().split('/')[-1] + else: + visbase = os.path.split(vis.rstrip('/ '))[1] # Get only vis name, not entire path extn = '.ms' if keepmms==False else '.mms' imagename = visbase.replace(extn, '.science_image') # Images will be produced in $CWD @@ -125,7 +129,9 @@ def science_image(vis, spw, cell, robust, imsize, wprojplanes, niter, threshold, parallel = False if fitspw != '': spw = fitspw - + if isinstance(vis,list): #in the case of combining tracks, need a list equal to the number of MSs. Why? I don't know, but it didn't work with a single value + spw = [spw]*len(vis) + if not os.path.exists(imname): tclean(vis=vis, selectdata=False, datacolumn='corrected', imagename=imagename, diff --git a/processMeerKAT/selfcal_scripts/selfcal_part2.py b/processMeerKAT/selfcal_scripts/selfcal_part2.py index 60a4092..bc18e27 100644 --- a/processMeerKAT/selfcal_scripts/selfcal_part2.py +++ b/processMeerKAT/selfcal_scripts/selfcal_part2.py @@ -64,7 +64,7 @@ def selfcal_part2(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp else: logger.warning("Skipping selfcal loop {0} since calmode == ''.".format(loop)) -def pybdsf(imbase,rmsfile,imagename,outimage,thresh,maskfile,cat,trim_box=None,write_all=True): +def pybdsf(imbase,rmsfile,imagename,outimage,thresh,maskfile,cat,loop,trim_box=None,write_all=True): fitsname = outimage @@ -109,7 +109,7 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp outlier_min_flux = 1e-3 # 1 mJy if step != 'sky': - pybdsf(imbase,rmsfile,imagename,outimage,thresh,maskfile,cat) + pybdsf(imbase,rmsfile,imagename,outimage,thresh,maskfile,cat,loop) #Store before potentially updating to mJy orig_threshold = outlier_threshold @@ -283,7 +283,7 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp if os.path.exists(im): #Run PyBDSF on outlier and update mask - pybdsf(imbase,rmsfile,base,im,outlier_snr,outlier_mask,outlier_cat,write_all=False) + pybdsf(imbase,rmsfile,base,im,outlier_snr,outlier_mask,outlier_cat,loop,write_all=False) outlier_pixmask = mask_image(**local,outlier_base=base,outlier_image=im) else: #Use main image, run PyBDSF on box around outlier, and update mask @@ -300,7 +300,7 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp else: logger.warning("Image '{0}' doesn't exist. Assuming source exists inside main image, so running PyBDSF on '{1}' using {2}x{2} pixel trim box.".format(im,outimage,outlier_imsize)) - pybdsf(imbase,rmsfile,imagename,outimage,outlier_snr,outlier_mask,outlier_cat,trim_box=trim_box,write_all=False) + pybdsf(imbase,rmsfile,imagename,outimage,outlier_snr,outlier_mask,outlier_cat,loop,trim_box=trim_box,write_all=False) outlier_pixmask = mask_image(**local,outlier_base=base) mask = 'mask={0}'.format(outlier_pixmask)