Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Branch to image using multiple tracks #66

Open
wants to merge 4 commits into
base: HI-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@
*.egg-info
*.swp # vim swap files

.ipynb_checkpoints

build/
dist/
9 changes: 9 additions & 0 deletions processMeerKAT/bookkeeping.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,o
msmd = msmetadata()
qa = quanta()

if ',' in vis: #detect if we are combining tracks
vis = vis.split(',')[0]
combTracks = True
else:
combTracks = False

if os.path.exists('{0}/SUBMSS'.format(vis)):
tmpvis = glob.glob('{0}/SUBMSS/*'.format(vis))[0]
else:
Expand All @@ -213,6 +219,9 @@ def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,o

target_str = msmd.namesforfields(targetfield)[0]

if combTracks: #currently setting visbase to the CWD in the case of combining tracks
visbase = os.getcwd().split('/')[-1]

if '.ms' in visbase and target_str not in visbase:
basename = visbase.replace('.ms','.{0}'.format(target_str))
else:
Expand Down
97 changes: 97 additions & 0 deletions processMeerKAT/default_config_comb.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
[data]
vis = ''

[fields]
bpassfield = ''
fluxfield = ''
phasecalfield = ''
targetfields = ''
extrafields = ''

[slurm] # See processMeerKAT.py -h for documentation
nodes = 1
ntasks_per_node = 8
plane = 1
mem = 232 # Use this many GB of memory (per node)
partition = 'Main' # SLURM partition to use
exclude = '' # SLURM nodes to exclude
time = '12:00:00'
submit = False
container = '/idia/software/containers/casa-6.6.0-modular.sif'
mpi_wrapper = 'mpirun'
name = ''
dependencies = ''
account = 'b03-idia-ag'
reservation = ''
modules = ['openmpi/4.0.3']
verbose = False
precal_scripts = []
postcal_scripts = [('prepareTracks.py', True, ''), ('science_image.py', False, '')]
scripts = []

[crosscal]
minbaselines = 4 # Minimum number of baselines to use while calibrating
chanbin = 1 # Number of channels to average before calibration (during partition)
width = 1 # Number of channels to (further) average after calibration (during split)
timeavg = '8s' # Time interval to average after calibration (during split)
createmms = True # Create MMS (True) or MS (False) for cross-calibration during partition
keepmms = True # Output MMS (True) or MS (False) during split
spw = '*:880~933MHz,*:960~1010MHz,*:1010~1060MHz,*:1060~1110MHz,*:1110~1163MHz,*:1299~1350MHz,*:1350~1400MHz,*:1400~1450MHz,*:1450~1500MHz,*:1500~1524MHz,*:1630~1680MHz' # Spectral window / frequencies to extract for MMS
nspw = 11 # Number of spectral windows to split into
calcrefant = False # Calculate reference antenna in program (overwrites 'refant')
refant = 'm059' # Reference antenna name / number
standard = 'Stevens-Reynolds 2016'# Flux density standard for setjy
badants = [] # List of bad antenna numbers (to flag)
badfreqranges = [ '933~960MHz', # List of bad frequency ranges (to flag)
'1163~1299MHz',
'1524~1630MHz']

[selfcal]
nloops = 2 # Number of clean + bdsf loops.
loop = 0 # If nonzero, adds this number to nloops to name images or continue previous run
cell = '1.5arcsec'
robust = -0.5
imsize = [6144, 6144]
wprojplanes = 512
niter = [10000, 50000, 50000]
threshold = ['0.5mJy', 10, 10] # After loop 0, S/N values if >= 1.0, otherwise Jy
nterms = 2 # Number of taylor terms
gridder = 'wproject'
deconvolver = 'mtmfs'
calmode = ['','p'] # '' to skip solving (will also exclude mask for this loop), 'p' for phase-only and 'ap' for amplitude and phase
solint = ['','1min']
uvrange = '' # uv range cutoff for gaincal
flag = True # Flag residual column after selfcal?
gaintype = 'G' # Use 'T' for polarisation on linear feeds (e.g. MeerKAT)
discard_nloops = 0 # Discard this many selfcal solutions (e.g. from quick and dirty image) during subsequent loops (only considers when calmode !='')
outlier_threshold = 0.0 # S/N values if >= 1.0, otherwise Jy
outlier_radius = 0.0 # Radius in degrees for identifying outliers in RACS
method ='default' # Mask creation method for science imaging (i.e. image or cube mask. default = image)

[image]
cell = '2arcsec'
robust = 0.0
imsize = [1500, 1500]
wprojplanes = 128
niter = 50000
threshold = '0.1mJy' # S/N value if >= 1.0 and rmsmap != '', otherwise Jy
multiscale = [0, 5, 10, 15]
nterms = 2 # Number of taylor terms
gridder = 'wproject'
deconvolver = 'multiscale'
specmode = 'cube' # 'mfs' for continuum image, 'cube' for spectral-line image
uvtaper = ''
restfreq = '1420.406MHz' # HI line (change for other emission lines - e.g. OH)
fitspw = '' # Emission line frequencies to exclude from fit to continuum during uvcontsub, and SPW to select for imaging when specmode='cube'
fitorder = 1 # Order of polynomial fit to continuum during uvcontsub
restoringbeam = ''
stokes = 'I'
pbthreshold = 0.1 # Threshold below which to mask the PB for PB correction
pbband = 'LBand' # Which band to use to generate the PB - one of "LBand" "SBand" or "UHF"
mask = ''
rmsmap = ''
outlierfile = ''

[run] # Internal variables for pipeline execution
continue = True
dopol = False
78 changes: 78 additions & 0 deletions processMeerKAT/prepareTracks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import os
import sys
import glob
from shutil import copytree

import config_parser
from config_parser import validate_args as va
import bookkeeping

from casatasks import *
logfile=casalog.logfile()
casalog.setlogfile('logs/{SLURM_JOB_NAME}-{SLURM_JOB_ID}.casa'.format(**os.environ))
import casampi

from casatools import msmetadata,image
msmd = msmetadata()
ia = image()

import logging
from time import gmtime
logging.Formatter.converter = gmtime
logger = logging.getLogger(__name__)
logging.basicConfig(format="%(asctime)-15s %(levelname)s: %(message)s", level=logging.INFO)


from aux_scripts.concat import check_output, get_infiles
from selfcal_scripts.selfcal_part2 import find_outliers, mask_image


def deconvolveMSs(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojplanes, niter, threshold, uvrange, nterms,
gridder, deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, outlier_radius, flag):

imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,_,_,_,_ = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,outlier_radius,threshold,step='tclean')
calcpsf = True

vis = vis.split(',')

tclean(vis=vis, selectdata=False, datacolumn='corrected', imagename=imagename,
imsize=imsize[loop], cell=cell[loop], stokes='I', gridder=gridder[loop],
wprojplanes = wprojplanes[loop], deconvolver = deconvolver[loop], restoration=True,
weighting='briggs', robust = robust[loop], niter=niter[loop], outlierfile=outlierfile,
threshold=threshold[loop], nterms=nterms[loop], calcpsf=calcpsf, # cfcache = cfcache,
pblimit=-1, mask=pixmask, parallel = True)

def makeMask(params, method='default'):

if method == 'default':
rmsmap, outlierfile = find_outliers(**params,step='bdsf')
pixmask = mask_image(**params)

#elif method == "your method":
#space to add different, user-defined methods into the pipeline. e.g., a spectral mask.

return rmsmap, outlierfile, pixmask


if __name__ == '__main__':

args,params = bookkeeping.get_selfcal_params()
method = params.pop('method')[0]

##blind deconvolution
deconvolveMSs(**params)

#make mask for imaging
rmsmap, outlierfile, pixmask = makeMask(params, method=method)

loop = params['loop']

#update config file for science imaging
if config_parser.has_section(args['config'], 'image'):
if config_parser.get_key(args['config'], 'image', 'specmode') != 'cube':
# Don't copy over continuum mask for cube-mode science imaging; allow for 3D mask (e.g. SoFiA)
config_parser.overwrite_config(args['config'], conf_dict={'mask' : "'{0}'".format(pixmask)}, conf_sec='image')
config_parser.overwrite_config(args['config'], conf_dict={'rmsmap' : "'{0}'".format(rmsmap)}, conf_sec='image')
config_parser.overwrite_config(args['config'], conf_dict={'outlierfile' : "'{0}'".format(outlierfile)}, conf_sec='image')

bookkeeping.rename_logs(logfile)
78 changes: 69 additions & 9 deletions processMeerKAT/processMeerKAT.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
AUX_SCRIPTS_DIR = 'aux_scripts'
SELFCAL_SCRIPTS_DIR = 'selfcal_scripts'
CONFIG = 'default_config.txt'
CONFIG_COMB = 'default_config_comb.txt'
TMP_CONFIG = '.config.tmp'
MASTER_SCRIPT = 'submit_pipeline.sh'
SPW_PREFIX = '*:'
Expand All @@ -66,6 +67,7 @@
MPI_WRAPPER = 'mpirun'
PRECAL_SCRIPTS = [('calc_refant.py',False,''),('partition.py',True,'')] #Scripts run before calibration at top level directory when nspw > 1
POSTCAL_SCRIPTS = [('concat.py',False,''),('plotcal_spw.py', False, ''),('selfcal_part1.py',True,''),('selfcal_part2.py',False,''),('science_image.py', True, '')] #Scripts run after calibration at top level directory when nspw > 1
POSTCAL_SCRIPTS_COMB = [('prepareTracks.py',True,''),('science_image.py',True,'')]
SCRIPTS = [ ('validate_input.py',False,''),
('flag_round_1.py',True,''),
('calc_refant.py',False,''),
Expand Down Expand Up @@ -212,6 +214,7 @@ def parse_scripts(val):
#add mutually exclusive group - don't want to build config, run pipeline, or display version at same time
run_args = parser.add_mutually_exclusive_group(required=True)
run_args.add_argument("-B","--build", action="store_true", required=False, default=False, help="Build config file using input MS.")
run_args.add_argument("-BC","--combtracks", action="store_true",required=False, default=False, help="Build config file to image multiple tracks")
run_args.add_argument("-R","--run", action="store_true", required=False, default=False, help="Run pipeline with input config file.")
run_args.add_argument("-V","--version", action="store_true", required=False, default=False, help="Display the version of this pipeline and quit.")
run_args.add_argument("-L","--license", action="store_true", required=False, default=False, help="Display this program's license and quit.")
Expand All @@ -221,6 +224,11 @@ def parse_scripts(val):
if len(unknown) > 0:
parser.error('Unknown input argument(s) present - {0}'.format(unknown))

#force inclusion of selfcal and imaging parameters when combining tracks
if args.combtracks:
args.do2GC = True
args.science_image = True

if args.run:
if args.config is None:
parser.error("You must input a config file [--config] to run the pipeline.")
Expand Down Expand Up @@ -279,8 +287,13 @@ def validate_args(args,config,parser=None):
if args['MS'] not in [None,'None'] and not os.path.isdir(args['MS']):
msg = "Input MS '{0}' not found.".format(args['MS'])
raise_error(config, msg, parser)

if parser is not None and not args['build'] and args['MS']:

if parser is None or args['combtracks']:
if args['MS'] is None and not args['nofields']:
msg = "You must input the calibrated MS directories as a single comma-separated string into MS [-M --MS] to build the config file."
raise_error(config, msg, parser)

if parser is not None and not args['build'] and not args['combtracks'] and args['MS']:
msg = "Only input an MS [-M --MS] during [-B --build] step. Otherwise input is ignored."
raise_error(config, msg, parser)

Expand Down Expand Up @@ -923,8 +936,9 @@ def srun(arg_dict,qos=True,time=10,mem=4):
return call

def write_jobs(config, scripts=[], threadsafe=[], containers=[], num_precal_scripts=0, mpi_wrapper=MPI_WRAPPER, nodes=8, ntasks_per_node=4, mem=MEM_PER_NODE_GB_LIMIT,plane=1, partition='Main',
time='12:00:00', submit=False, name='', verbose=False, quiet=False, dependencies='', exclude='', account='b03-idia-ag', reservation='', modules=[], timestamp='', justrun=False):
time='12:00:00', submit=False, name='', verbose=False, quiet=False, dependencies='', exclude='', account='b03-idia-ag', reservation='', modules=[], timestamp='', justrun=False, combTracks=False):


"""Write a series of sbatch job files to calibrate a CASA MeasurementSet.

Arguments:
Expand Down Expand Up @@ -973,8 +987,10 @@ def write_jobs(config, scripts=[], threadsafe=[], containers=[], num_precal_scri
Modules to load upon execution of sbatch script.
timestamp : str, optional
Timestamp to put on this run and related runs in SPW directories.
justrun : bool, optionall
Just run the pipeline without rebuilding each job script (if it exists)."""
justrun : bool, optional
Just run the pipeline without rebuilding each job script (if it exists)
combTracks : bool, optional
Combining multiple tracks."""

kwargs = locals()
crosscal_kwargs = get_config_kwargs(config, 'crosscal', CROSSCAL_CONFIG_KEYS)
Expand Down Expand Up @@ -1093,6 +1109,41 @@ def default_config(arg_dict):

logger.info('Config "{0}" generated.'.format(filename))

def default_config_comb(arg_dict):

"""Generate default config file in current directory, pointing to MS, with fields and SLURM parameters set.

Arguments:
----------
arg_dict : dict
Dictionary of arguments passed into this script, which is inserted into the config file under various sections."""

filename = arg_dict['config']
MS = arg_dict['MS']

#Copy default config to current location
copyfile('{0}/{1}'.format(SCRIPT_DIR,CONFIG_COMB),filename)

#Add SLURM CL arguments to config file under section [slurm]
slurm_dict = get_slurm_dict(arg_dict,SLURM_CONFIG_KEYS)
for key in SLURM_CONFIG_STR_KEYS:
if key in slurm_dict.keys(): slurm_dict[key] = "'{0}'".format(slurm_dict[key])

slurm_dict['postcal_scripts'] = POSTCAL_SCRIPTS_COMB
slurm_dict['precal_scripts'] = []
slurm_dict['scripts'] = []
arg_dict['scripts'] = []

#Overwrite CL parameters in config under section [slurm]
config_parser.overwrite_config(filename, conf_dict=slurm_dict, conf_sec='slurm')

#Add MS to config file under section [data] and dopol under section [run]
config_parser.overwrite_config(filename, conf_dict={'vis' : "'{0}'".format(MS)}, conf_sec='data')

config_parser.overwrite_config(filename, conf_dict={'scripts' : arg_dict['scripts']}, conf_sec='slurm')

logger.info('Config "{0}" generated.'.format(filename))

def get_slurm_dict(arg_dict,slurm_config_keys):

"""Build a slurm dictionary to be inserted into config file, using specified keys.
Expand Down Expand Up @@ -1244,9 +1295,16 @@ def format_args(config,submit,quiet,dependencies,justrun):

kwargs['num_precal_scripts'] = len(kwargs['precal_scripts'])

# Validate kwargs along with MS
kwargs['MS'] = data_kwargs['vis']
validate_args(kwargs,config)
# Validate kwargs along with MS, and validate each MS individually in the case of combining tracks
if ',' in data_kwargs['vis']: #a comma indicates this is a list of MSs
MSs = data_kwargs['vis'].split(',')
for MS in MSs:
kwargs['MS'] = MS
validate_args(kwargs,config)
kwargs['MS'] = MSs
else:
kwargs['MS'] = data_kwargs['vis']
validate_args(kwargs,config)

#Reformat scripts tuple/list, to extract scripts, threadsafe, and containers as parallel lists
#Check that path to each script and container exists or is ''
Expand Down Expand Up @@ -1441,7 +1499,7 @@ def spw_split(spw,nspw,config,mem,badfreqranges,MS,partition,createmms=True,remo
#Create each spw as directory and place config in there
logger.info("Making {0} directories for SPWs ({1}) and copying '{2}' to each of them.".format(nspw,SPWs,config))
for spw in SPWs:
spw_config = '{0}/{1}'.format(spw.replace(SPW_PREFIX,''),config)
spw_config = '{0}/{1}'.format(spw.replace(SPW_PREFIX,''),config.split('/')[-1]) #fixes a bug where the leading slash causes a file error
if not os.path.exists(spw.replace(SPW_PREFIX,'')):
os.mkdir(spw.replace(SPW_PREFIX,''))
copyfile(config, spw_config)
Expand Down Expand Up @@ -1544,6 +1602,8 @@ def main():
logger.info(license)
if args.build:
default_config(vars(args))
if args.combtracks:
default_config_comb(vars(args))
if args.run:
kwargs = format_args(args.config,args.submit,args.quiet,args.dependencies,args.justrun)
write_jobs(args.config, **kwargs)
Expand Down
10 changes: 8 additions & 2 deletions processMeerKAT/science_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ def do_pb_corr(inpimage, pbthreshold=0, pbband='LBand'):

def science_image(vis, spw, cell, robust, imsize, wprojplanes, niter, threshold, multiscale, nterms, gridder, deconvolver, specmode, uvtaper, restfreq, restoringbeam, stokes, mask, rmsmap, outlierfile, keepmms, pbthreshold, pbband, fitspw):

visbase = os.path.split(vis.rstrip('/ '))[1] # Get only vis name, not entire path
if ',' in vis: #currently setting visbase to the CWD in the case of combining tracks
vis = vis.split(',')
visbase = os.getcwd().split('/')[-1]
else:
visbase = os.path.split(vis.rstrip('/ '))[1] # Get only vis name, not entire path
extn = '.ms' if keepmms==False else '.mms'
imagename = visbase.replace(extn, '.science_image') # Images will be produced in $CWD

Expand All @@ -125,7 +129,9 @@ def science_image(vis, spw, cell, robust, imsize, wprojplanes, niter, threshold,
parallel = False
if fitspw != '':
spw = fitspw

if isinstance(vis,list): #in the case of combining tracks, need a list equal to the number of MSs. Why? I don't know, but it didn't work with a single value
spw = [spw]*len(vis)

if not os.path.exists(imname):

tclean(vis=vis, selectdata=False, datacolumn='corrected', imagename=imagename,
Expand Down
Loading