-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathplotiterator.py
3785 lines (3514 loc) · 193 KB
/
plotiterator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
## Generate plots of quantities from a measurment set
##
## $Id: plotiterator.py,v 1.25 2017-02-21 09:10:05 jive_cc Exp $
##
## $Log: plotiterator.py,v $
## Revision 1.25 2017-02-21 09:10:05 jive_cc
## HV: * DesS requests normalized vector averaging - complex numbers are first
## normalized before being averaged. See "help avt" or "help avc".
##
## Revision 1.24 2017-01-27 13:50:28 jive_cc
## HV: * jplotter.py: small edits
## - "not refresh(e)" => "refresh(e); if not e.plots ..."
## - "e.rawplots.XXX" i.s.o. "e.plots.XXX"
## * relatively big overhaul: in order to force (old) pyrap to
## re-read tables from disk all table objects must call ".close()"
## when they're done.
## Implemented by patching the pyrap.tables.table object on the fly
## with '__enter__' and '__exit__' methods (see "ms2util.opentable(...)")
## such that all table access can be done in a "with ..." block:
## with ms2util.opentable(...) as tbl:
## tbl.getcol('DATA') # ...
## and then when the with-block is left, tbl gets automagically closed
##
## Revision 1.23 2015-12-09 07:02:11 jive_cc
## HV: * big change! the plotiterators now return a one-dimensional dict
## of label => dataset. The higher level code reorganizes them
## into plots, based on the 'new plot' settings. Many wins in this one:
## - the plotiterators only have to index one level in stead of two
## - when the 'new plot' setting is changed, we don't have to read the
## data from disk again [this is a *HUGE* improvement, especially for
## larger data sets]
## - the data set expression parser is simpler, it works on the
## one-dimensional 'list' of data sets and it does not have to
## flatten/unflatten any more
## * The code to deal with refreshing the plots has been rewritten a bit
## such that all necessary steps (re-organizing raw plots into plots,
## re-running the label processing, re-running the post processing,
## re-running the min/max processing) are executed only once; when
## necessary. And this code is now shared between the "pl" command and
## the "load/store" commands.
##
## Revision 1.22 2015-09-23 12:28:36 jive_cc
## HV: * Lorant S. requested sensible requests (ones that were already in the
## back of my mind too):
## - option to specify the data column
## - option to not reorder the spectral windows
## Both options are now supported by the code and are triggered by
## passing options to the "ms" command
##
## Revision 1.21 2015-04-29 14:34:55 jive_cc
## HV: * add support for plotting quantity vs UV-distance
##
## Revision 1.20 2015-04-08 14:34:12 jive_cc
## HV: * Correct checking of wether dataset.[xy] are of the numpy.ndarray
## persuasion
##
## Revision 1.19 2015-02-16 12:56:53 jive_cc
## HV: * Now that we do our own slicing, found that some of the index limits
## were off-by-one
##
## Revision 1.18 2015-02-02 08:55:22 jive_cc
## HV: * support for storing/loading plots, potentially manipulating them
## via arbitrary arithmetic expressions
## * helpfile layout improved
##
## Revision 1.17 2015-01-09 14:27:57 jive_cc
## HV: * fixed copy-paste error in weight-thresholded quantity-versus-time fn
## * sped up SOLINT processing by factor >= 2
## * output of ".... took XXXs" consistentified & beautified
## * removed "Need to convert ..." output; the predicted expected runtime
## was usually very wrong anyway.
##
## Revision 1.16 2015-01-09 00:02:27 jive_cc
## HV: * support for 'solint' - accumulate data in time bins of size 'solint'
## now also in "xxx vs time" plots. i.e. can be used to bring down data
## volume by e.g. averaging down to an arbitrary amount of seconds.
## * solint can now be more flexibly be set using d(ays), h(ours),
## m(inutes) and/or s(econds). Note that in the previous versions a
## unitless specification was acceptable, in this one no more.
##
## Revision 1.15 2014-11-28 14:25:04 jive_cc
## HV: * spelling error in variable name ...
##
## Revision 1.14 2014-11-26 14:56:21 jive_cc
## HV: * pycasa autodetection and use
##
## Revision 1.13 2014-05-14 17:35:15 jive_cc
## HV: * if weight threshold is applied this is annotated in the plot
## * the plotiterators have two implementations now, one with weight
## thresholding and one without. Until I find a method that is
## equally fast with/without weight masking
##
## Revision 1.12 2014-05-14 17:02:01 jive_cc
## HV: * Weight thresholding implemented - but maybe I'll double the code
## to two different functions, one with weight thresholding and one
## without because weight thresholding is sloooooow
##
## Revision 1.11 2014-05-12 21:27:28 jive_cc
## HV: * IF time was an essential part of a label, its resolution of 1second
## was not enough - made it 1/100th of a second. So now you can safely
## plot data sets with individual time stamps even if they're << 1 second
## apart
##
## Revision 1.10 2014-05-06 14:20:39 jive_cc
## HV: * Added marking capability
##
## Revision 1.9 2014-04-15 07:53:17 jive_cc
## HV: * time averaging now supports 'solint' = None => average all data in
## each time-range selection bin
##
## Revision 1.8 2014-04-14 21:04:44 jive_cc
## HV: * Information common to all plot- or data set labels is now stripped
## and displayed in the plot heading i.s.o in the plot- or data set label
##
## Revision 1.7 2014-04-14 14:46:05 jive_cc
## HV: * Uses pycasa.so for table data access waiting for pyrap to be fixed
## * added "indexr" + scan-based selection option
##
## Revision 1.6 2014-04-10 21:14:40 jive_cc
## HV: * I fell for the age-old Python trick where a default argument is
## initialized statically - all data sets were integrating into the
## the same arrays! Nice!
## * Fixed other efficiency measures: with time averaging data already
## IS in numarray so no conversion needs to be done
## * more improvements
##
## Revision 1.5 2014-04-09 08:26:46 jive_cc
## HV: * Ok, moved common plotiterator stuff into baseclass
##
## Revision 1.4 2014-04-08 23:34:13 jive_cc
## HV: * Minor fixes - should be better now
##
## Revision 1.3 2014-04-08 22:41:11 jive_cc
## HV: Finally! This might be release 0.1!
## * python based plot iteration now has tolerable speed
## (need to test on 8M row MS though)
## * added quite a few plot types, simplified plotters
## (plotiterators need a round of moving common functionality
## into base class)
## * added generic X/Y plotter
##
## Revision 1.2 2014-04-02 17:55:30 jive_cc
## HV: * another savegame, this time with basic plotiteration done in Python
##
## Revision 1.1 2013-12-12 14:10:16 jive_cc
## HV: * another savegame. Now going with pythonic based plotiterator,
## built around ms2util.reducems
##
##
from __future__ import print_function
from six import iteritems
import ms2util, hvutil, jenums, itertools, copy, operator, numpy, math, imp, time, collections, functional, plotutil
import pyrap.quanta
# Auto-detect of pycasa
havePyCasa = True
try:
import pycasa
print("*** using PyCasa for measurementset data access ***")
except:
havePyCasa = False
## Introduce some shorthands
NOW = time.time
CP = copy.deepcopy
AX = jenums.Axes
AVG = jenums.Averaging
YTypes = plotutil.YTypes
Quantity = collections.namedtuple('Quantity', ['quantity_name', 'quantity_fn'])
# do not drag in all numpy.* names but by "resolving" them at this level
# we shave off a lot of python name lookups. this has an effect on code which
# is called a lot of times per second - like the code in here
ANY = numpy.any
ALL = numpy.all
ADD = numpy.add
SQRT = numpy.sqrt
SQUARE = numpy.square
ARRAY = numpy.array
MARRAY = numpy.ma.array
ISFINITE = numpy.isfinite
LOGICAL_OR = numpy.logical_or
_ArrayT = numpy.ndarray
_MArrayT = numpy.ma.core.MaskedArray
IsArray = lambda x: isinstance(x, _ArrayT) or isinstance(x, _MArrayT) or isinstance(x, list)
# Useful simple functions
# take the current channel selection, and produce a list of the sorted unique channels
mk_chansel = functional.compose(list, sorted, set, CP)
print_if = functional.choice(operator.truth, functional.printf, functional.const(None))
# We support different kinds of averaging
def avg_vectornorm(ax):
def do_it(x):
# first set all flagged + NaN/Inf values to 0 such that
# (1) any NaN/Inf's don't screw up the total sum
# (2) flagged data doesn't count towards the sum
# We're going to need the input-mask twice
imask = LOGICAL_OR(~ISFINITE(x.data), x.mask)
# Sum all values along the requested axis
x.data[imask] = 0
total = numpy.sum(x.data, axis=ax, keepdims=True)
# figure out where the counts along the requested axis are 0
# and transform to a mask such that masked values in the *output*
# may effectively be removed
flags = ARRAY(numpy.sum(~imask, axis=ax, keepdims=True)==0, dtype=numpy.bool)
# Find the maximum unflagged value along the input axis.
# Flagged data gets set to -inf such that max()
# may yield a useful result
mdata = numpy.abs(x.data)
mdata[imask] = -numpy.inf
total /= numpy.max(mdata, axis=ax, keepdims=True)
# remove those data points
total[flags] = numpy.nan
return MARRAY(total, mask = flags)
return do_it
def avg_arithmetic(ax):
# normal arithmetic mean, should work both on complex or scalar data
def do_it(x):
# first set all flagged + NaN/Inf values to 0 such that
# (1) any NaN/Inf's don't screw up the total sum
# (2) flagged data doesn't count towards the sum
imask = LOGICAL_OR(~ISFINITE(x.data), x.mask)
x[ imask ] = 0
total = numpy.sum(x.data, axis=ax, keepdims=True)
counts = numpy.sum(~imask, axis=ax, keepdims=True)
# figure out where the counts are 0 - effectively
# remove those data points
nmask = ARRAY(counts==0, dtype=numpy.bool)
# we have computed where the count==0 so we can now
# overwrite with 1 to prevent divide-by-zero errors.
# Later we'll replace those values with NaN
counts[nmask]=1
total /= counts
# And indicate where there was no average at all
total[nmask] = numpy.NaN
return MARRAY(total, mask = nmask)
return do_it
def avg_sum(ax):
# normal arithmetic mean, should work both on complex or scalar data
def do_it(x):
# first set all flagged + NaN/Inf values to 0 such that
# (1) any NaN/Inf's don't screw up the total sum
# (2) flagged data doesn't count towards the sum
imask = LOGICAL_OR(~ISFINITE(x.data), x.mask)
x.data[ imask ] = 0
# Sum all values along the requested axis
total = numpy.sum(x.data, axis=ax, keepdims=True)
# count unflagged points along that axis and set flag if
# there aren't any of those
flags = ARRAY(numpy.sum(~imask, axis=ax, keepdims=True)==0, dtype=numpy.bool)
# remove points that didn't have any unflagged data
total[flags] = numpy.nan
return MARRAY(total, mask = flags)
return do_it
def avg_none(_):
return functional.identity
## The base class holds the actual table object -
## makes sure the selection etc gets done
class plotbase(object):
def __enter__(self, *args, **kwargs):
return self
def __exit__(self, *args, **kwargs):
if hasattr(self, 'table'):
self.table.close()
# depending on combination of query or not and read flags or not
# we have optimum call sequence for processing a table
# key = (qryYesNo, readFlagYesNo)
# I think that executing an empty query
# tbl.query('')
# takes longer than
# tbl.query()
_qrycolmapf = {
(False, False): lambda tbl, q, c: tbl, # no query, no flagcolum reading
(True , False): lambda tbl, q, c: tbl.query(q), # only query
(False, True ): lambda tbl, q, c: tbl.query(columns=c), # only different columns
(True, True ): lambda tbl, q, c: tbl.query(q, columns=c) # the works
}
## selection is a selection object from 'selection.py'
def __init__(self, msname, selection, mapping, **kwargs):
self.verbose = kwargs.setdefault('verbose', True)
self.flags = kwargs.get('readflags', True)
self.datacol = CP(mapping.domain.column)
self.table = pycasa.table(msname) if havePyCasa else ms2util.opentable(msname)
# when reading flags is enabled let the C++ do the OR'ing of FLAG_ROW, FLAG columns for us
colnames = None #"*, (FLAG_ROW || FLAG) AS FLAGCOL" if self.flags else None
## apply selection if necessary
qry = selection.selectionTaQL()
s = NOW()
self.table = plotbase._qrycolmapf[(bool(qry), bool(colnames))](self.table, qry, colnames)
e = NOW()
if not self.table:
raise RuntimeError("No data available for your selection criteria")
if qry and self.verbose:
print("Query took\t\t{0:.3f}s".format(e-s))
# we'll provide overrides for specific column readers
# for efficiency for the WEIGHT and/or FLAG data
#
# Subclasses can request 'WEIGHTCOL' and/or 'FLAGCOL'
# in their call to reducems2(...), provided they pass
# in the self.slicers{..} object which we'll have prepared
self.slicers = dict()
# Set up weight thresholding.
# We have to be able to deal with the following weight shapes:
# numpy.Inf (weights not read)
# (n_int, n_pol) [WEIGHT column read]
# (n_int, n_freq, n_pol) [WEIGHT_SPECTRUM read]
#
# In order to turn the weight criterion (if any) into a mask we must
# potentially broadcast the WEIGHT shape (one weight per polarization)
# to all channels; the data always has shape:
# (n_int, n_freq, n_pol)
#
# We can do that efficiently by transposing the data array in that case to be:
# (n_freq, n_int, n_pol)
# Now the data mask also has this shape.
# Then, numpy.logical_or(data.mask, weight_mask) does the right thing:
# weight_mask == (n_int, n_pol) -> dimensions match on both data.mask and weight_mask
# but for the first -> broadcasted along dim 0, which
# is n_freq, i.e. each spectral point gets the same weight per pol
self.threshold = CP(selection.weightThreshold) if selection.weightThreshold is not None else -numpy.Inf
transpose = weight_rd = None
if self.threshold == -numpy.Inf:
# No weight thresholding? Return an infinite weight to effectively disable thresholding
weight_rd = lambda _a,_b,_c,_d: numpy.Inf
# Also no need to transpose/untranspose the data array for this 'shape'
transpose = functional.identity
else:
# weight read from MS, choose which column to use
weight_col = 'WEIGHT_SPECTRUM' if 'WEIGHT_SPECTRUM' in self.table.colnames() else 'WEIGHT'
weight_rd = lambda tab, _, s, n: tab.getcol(weight_col, startrow=s, nrow=n)
# the need to transpose/untranspose the data array depends on which is the weight column
transpose = operator.methodcaller('transpose', (1,0,2)) if weight_col == 'WEIGHT' else functional.identity
# install the appropriate weight reader for the 'WEIGHTCOL' column
self.slicers['WEIGHTCOL'] = weight_rd
self.transpose = transpose
# Set up FLAG reading
# If self.flags is set, then we take FLAG, FLAG_ROW from the MS,
# otherwise we override the slicer with something that don't do nothing
# but return 'not flagged'
# Because we need to do (FLAG || FLAG_ROW) we play the same transpose
# trick as with the WEIGHT/WEIGHT_SPECTRUM above only different.
# FLAG = (nrow, nchan, npol)
# FLAG_ROW = (nrow,)
# so by doing "flag.transpose((1,2,0))" it becomes (nchan, npol, nrow)
# and now numpy.logical_or(flag, flag_row) broadcasts Just Fine(tm)!
# But we only have to do that if we actually read flags from the MS ...
self.transpose_flag = operator.methodcaller('transpose', (1,2,0))
self.untranspose_flag = operator.methodcaller('transpose', (2,0,1))
if not self.flags:
# no flags to be read, replace with no-ops
no_flag = lambda _a,_b,_c,_d: False
self.slicers['FLAG'] = no_flag
self.slicers['FLAG_ROW'] = no_flag
self.transpose_flag = self.untranspose_flag = functional.identity
## Parse data-description-id selection into a map:
## self.ddSelection will be
## map [ DATA_DESC_ID ] => (FQ, SB, POLS)
##
## With FQ, SB integer - the indices,
## POLS = [ (idx, str), ... ]
## i.e. list of row indices and the polarization string
## to go with it, such that the polarization data
## is put in the correct plot/data set immediately
##
# The matter of the fact is that the polarization row index ('idx'
# above) is not a unique mapping to physical polarization so we cannot
# get away with using the numerical label, even though that would be
# faster
_pMap = mapping.polarizationMap
_spwMap = mapping.spectralMap
GETF = _spwMap.frequenciesOfFREQ_SB
# Frequencies get done in MHz
scale = 1e6 if mapping.domain.domain == jenums.Type.Spectral else 1
## if user did not pass DATA_DESC_ID selection, default to all
if selection.ddSelection:
## An element in "ddSelection" is a 4-element tuple with
## fields (FQ, SB, POLID, [product indices])
## So all we need is to pair the product indices with the
## appropriate polarization strings
GETDDID = _spwMap.datadescriptionIdOfFREQ_SB_POL
ITEMGET = hvutil.itemgetter
def ddIdAdder(acc, ddSel):
(fq, sb, pid, l) = ddSel
ddId = GETDDID(fq, sb, pid)
polStrings = _pMap.getPolarizations(pid)
acc[0][ ddId ] = (fq, sb, functional.zip_(l, ITEMGET(*l)(polStrings)))
acc[1][ ddId ] = GETF(fq, sb)/scale
return acc
(self.ddSelection, self.ddFreqs) = functional.reduce(ddIdAdder, selection.ddSelection, [{}, {}])
else:
ddids = _spwMap.datadescriptionIDs()
UNMAPDDID = _spwMap.unmapDDId
def ddIdAdder(acc, dd):
# Our data selection is rather simple: all the rows!
r = UNMAPDDID(dd)
acc[0][ dd ] = (r.FREQID, r.SUBBAND, list(enumerate(_pMap.getPolarizations(r.POLID))))
acc[1][ dd ] = GETF(r.FREQID, r.SUBBAND)/scale
return acc
(self.ddSelection, self.ddFreqs) = functional.reduce(ddIdAdder, ddids, [{}, {}])
## Provide for a label unmapping function.
## After creating the plots we need to transform the labels - some
## of the numerical indices must be unmapped into physical quantities
#unmapBL = mapping.baselineMap.baselineName
#unmapFQ = mapping.spectralMap.freqGroupName
#unmapSRC = mapping.fieldMap.field
unmap_f = { AX.BL: mapping.baselineMap.baselineName,
AX.FQ: mapping.spectralMap.freqGroupName,
AX.SRC: mapping.fieldMap.field,
AX.TIME: lambda t: pyrap.quanta.quantity(t, "s").formatted("time", precision=8) }
identity = lambda x: x
def unmap( fld_val ):
return (fld_val[0], unmap_f.get(fld_val[0], identity)(fld_val[1]))
# flds is the list of field names that the values in the tuple mean
self.MKLAB = lambda flds, tup: plotutil.label( dict(map(unmap, zip(flds, tup))), flds )
##
## Should return the generated plots according to the following
## structure:
##
## Update: Dec 2015 - we start doing things a little different
## the raw data sets will be delivered as a dict of
## Dict: Key -> Value, where Key is the full data set
## label and Value the dataset() object.
## The division into plots will be done at a higher
## level. Reasons:
## - generation of raw data is faster as only one level
## of dict indexing is needed i.s.o. two
## - if user changes the new plot settings, we don't
## have to read from disk no more, it then is a mere
## rearrangement of the raw data sets
## - load/store with expressions on data sets now work
## on the one-dimensional 'list' of data sets, no need
## to flatten/unflatten anymore
##
## plots = dict( Key -> Value ) with
## Key = <plot index> # contains physical quantities/labels
## Value = DataSet
## DataSet = instance of 'dataset' (see below) with
## attributes ".x" and ".y"
def makePlots(self, *args):
raise RuntimeError("Someone forgot to implement this function for this plottype")
## Unfortunately, our code relies on the fact that the numarrays returned
## from "ms.getcol()" are 3-dimensional: (nintg x npol x nchannel)
## Sadly, casa is smart/stoopid enough to return no more dimensions
## than are needed; no degenerate axes are present.
## So if your table consists of one row, you get at best a matrix:
## npol x nchannel
## Further, if you happen to read single-pol data, guess what,
## you get a matrix at best and a vector at worst!:
## matrix: nintg x nchannel
## vector: nchannel (worst case: a table with one row of single pol data!)
##
## m3d() can be used to reshape an array to always be at least 3d,
## it inserts degenerate axis from the end, assuming that there
## won't be data sets with only one row ...
## (single pol does happen! a lot!)
#def m3d(ar):
# shp = list(ar.shape)
# while len(shp)<3:
# shp.insert(-1, 1)
# return ar.reshape( shp )
#
#def m2d(ar):
# shp = list(ar.shape)
# while len(shp)<2:
# shp.insert(-1, 1)
# return ar.reshape( shp )
#
#class dataset_org:
# __slots__ = ['x', 'y', 'n', 'a', 'sf', 'm']
#
# @classmethod
# def add_sumy(self, obj, xs, ys, m):
# obj.y = obj.y + ys
# obj.n = obj.n + 1
# obj.m = numpy.logical_and(obj.m, m)
#
# @classmethod
# def init_sumy(self, obj, xs, ys, m):
# obj.x = numpy.array(xs)
# obj.y = numpy.array(ys)
# obj.sf = dataset_org.add_sumy
# obj.m = m
#
# def __init__(self, x=None, y=None, m=None):
# if x is not None and len(x)!=len(y):
# raise RuntimeError("attempt to construct data set where len(x) != len(y)?!!!")
# self.x = list() if x is None else x
# self.y = list() if y is None else y
# self.m = list() if m is None else m
# self.n = 0 if x is None else 1
# self.sf = dataset_org.init_sumy if x is None else dataset_org.add_sumy
# self.a = False
#
# def append(self, xv, yv, m):
# self.x.append(xv)
# self.y.append(yv)
# self.m.append(m)
#
# # integrate into the current buffer
# def sumy(self, xs, ys, m):
# self.sf(self, xs, ys, m)
#
# def average(self):
# if not self.a and self.n>1:
# self.y = self.y / self.n
# self.a = True
#
# def is_numarray(self):
# return (type(self.x) is numpy.ndarray and type(self.y) is numpy.ndarray)
#
# def as_numarray(self):
# if self.is_numarray():
# return self
# # note to self: float32 has insufficient precision for e.g.
# # <quantity> versus time
# self.x = numpy.array(self.x, dtype=numpy.float64)
# self.y = numpy.array(self.y, dtype=numpy.float64)
# self.m = numpy.array(self.m, dtype=numpy.bool)
# return self
#
# def __str__(self):
# return "DATASET: {0} MASK: {1}".format(zip(self.x, self.y), self.m)
#
# def __repr__(self):
# return str(self)
class dataset_fixed:
__slots__ = ['x', 'y', 'm']
def __init__(self, x=None, y=None):
if x is not None and len(x)!=len(y):
raise RuntimeError("attempt to construct data set where len(x) != len(y)?!!!")
self.x = list() if x is None else x
self.y = list() if y is None else y
self.m = False
def append(self, xv, yv, m):
raise NotImplemented("append() does not apply to dataset_fixed!")
# integrate into the current buffer
def sumy(self, xs, ys, m):
raise NotImplemented("sumy() does not apply to dataset_fixed!")
def average(self, method):
raise NotImplemented("average() does not apply to dataset_fixed!")
def is_numarray(self):
return type(self) is _ArrayT and type(self.y) is _MArrayT
def as_numarray(self):
if self.is_numarray():
return self
# note to self: float32 has insufficient precision for e.g.
# <quantity> versus time
self.x = ARRAY(self.x, dtype=numpy.float64)
if type(self.y) is not _MArrayT: #numpy.ma.MaskedArray:
self.y = MARRAY(self.y, mask=~ISFINITE(self.y), dtype=numpy.float64)
return self
def __str__(self):
return "DATASET<fixed>: {0}".format(functional.zip_(self.x, self.y))
def __repr__(self):
return str(self)
#################################################################################
#
# .append() means append to list, fastest for collecting individual samples
# .average() verifies that no averaging is requested - this one can't handle that
#
#################################################################################
class dataset_list:
__slots__ = ['x', 'y', 'n', 'a', 'm']
@classmethod
def add_sumy(self, obj, xs, ys, m):
obj.y = obj.y + ys
obj.n = obj.n + 1
obj.m = numpy.logical_and(obj.m, m)
@classmethod
def init_sumy(self, obj, xs, ys, m):
obj.x = numpy.array(xs)
obj.y = numpy.array(ys)
obj.sf = dataset_list.add_sumy
obj.m = m
def __init__(self, x=None, y=None, m=None):
if x is not None and len(x)!=len(y):
raise RuntimeError("attempt to construct data set where len(x) != len(y)?!!!")
self.x = list() if x is None else x
self.y = list() if y is None else y
self.m = list() if m is None else m
self.n = 0 if x is None else 1
self.a = False
def append(self, xv, yv, m):
self.x.append(xv)
self.y.append(yv)
self.m.append(m)
def extend(self, xseq, yseq, mseq):
self.x.extend(xseq)
self.y.extend(yseq)
self.m.extend(mseq)
def average(self, method):
if method != AVG.NoAveraging:
raise RuntimeError("dataset_list was not made for time averaging")
def is_numarray(self):
return type(self.x) is _ArrayT and type(self.y) is _MArrayT
def as_numarray(self):
if self.is_numarray():
return self
# note to self: float32 has insufficient precision for e.g.
# <quantity> versus time
self.x = ARRAY(self.x, dtype=numpy.float64)
self.y = MARRAY(self.y, mask=self.m, dtype=numpy.float64)
return self
def __str__(self):
return "DATASET<list>: len(x)={0}, len(y)={1} len(m)={2}".format(len(self.x), len(self.y), len(self.m))
def __repr__(self):
return str(self)
#################################################################################
#
# Specialization for holding one (1) spectrum. x-axis = channel numbers
# The .add_y method may be called only once
#
#################################################################################
#class dataset_chan_wtf:
# __slots__ = ['x', 'y', 'sf']
#
# @classmethod
# def add_sumy(self, *_):
# raise RuntimeError("dataset_chan was not meant to integrate > 1 spectrum")
#
# @classmethod
# def init_sumy(self, obj, xs, ys, m):
# obj.x = ARRAY(xs)
# obj.y = MARRAY(ys, mask=m)
# obj.sf = dataset_chan.add_sumy
#
# def __init__(self):
# self.x = self.y = None
# self.sf = dataset_chan_wtf.init_sumy
#
# def append(self, xv, yv, m):
# raise NotImplemented("dataset_chan not meant for appending")
#
# def add_y(self, x,v, yv, m):
# self.sf(self, xv, yv, m)
#
# def average(self, method):
# if method != AVG.NoAveraging:
# raise RuntimeError("dataset_chan was not meant for averaging!")
#
# def is_numarray(self):
# return (type(self.x) is numpy.array and type(self.y) is numpy.ma.MaskedArray)
#
# def as_numarray(self):
# if self.is_numarray():
# return self
# # note to self: float32 has insufficient precision for e.g.
# # <quantity> versus time
# self.x = numpy.array(self.x, dtype=numpy.float64)
# self.y = numpy.ma.MaskedArray(self.y, mask=self.m, dtype=numpy.float64)
# return self
#
# def __str__(self):
# return "DATASET<chan>: len(x)={0}, len(y)={1} len(m)={2}".format(len(self.x), len(self.y), len(self.m))
#
# def __repr__(self):
# return str(self)
# Specialization for (potentially) averaging > 1 spectrum when solint'ing
class dataset_chan:
__slots__ = ['x', 'y', 'm', 'sf', 'af']
# 2nd call to .add_y() means that we have to transform
# the mask to integers and remove NaN/Inf (masked) values from
# the previously stored y-values in order to make sure that those
# don't screw up the totals
@classmethod
def add_sumy_first(self, obj, xs, ys, m):
# set masked values to 0 and convert mask to counts in existing object
obj.y[ obj.m ] = 0
obj.m = ARRAY(~obj.m, dtype=numpy.int)
# from now on, averaging has to do something
obj.af = dataset_chan.average_n
# from now on extra .add_y() calls will do something slight different
obj.sf = dataset_chan.add_sumy
# use the new .add_y() to do the "integration" for us
obj.sf(obj, xs, ys, m)
# before 'integrating' the y-values we must
# make sure no NaN/Inf values are present
# because a single NaN/Inf in a channel makes
# the whole summation for that channel go NaN/Inf.
# The reading process auto-flags points which have
# Inf/NaN so we can just set flagged values to 0
@classmethod
def add_sumy(self, obj, xs, ys, m):
ys[ m ] = 0
obj.y = obj.y + ys
obj.m = obj.m + ARRAY(~m, dtype=numpy.int) # transform mask into counts
# very first call to .add_y() just store the parameters
# no fancy processing
@classmethod
def init_sumy(self, obj, xs, ys, m):
obj.x = ARRAY(xs)
obj.y = ARRAY(ys)
obj.m = ARRAY(m) #ARRAY(~m, dtype=numpy.int) # tranform mask into counts
obj.sf = dataset_chan.add_sumy_first
obj.af = dataset_chan.average_noop
@classmethod
def average_empty(self, obj, method):
raise RuntimeError("dataset_chan: attempt to average uninitialized dataset (.add_y() never called)")
@classmethod
def average_noop(self, obj, method):
# nothing to average, really
obj.af = None
@classmethod
def average_n(self, obj, method):
# normal average = arithmetic mean i.e. summed value / count of valid values
fn = numpy.divide
if method==AVG.Vectornorm:
# for vector norm we divide by the largest (complex) amplitude
fn = lambda x, _: x/numpy.max(numpy.abs(x))
elif method in [AVG.NoAveraging, AVG.Sum, AVG.Vectorsum]:
fn = lambda x, _: x
# from counts form mask [note: do not clobber obj.m just yet, we need the counts!]
m = ARRAY(obj.m==0, dtype=numpy.bool)
# set counts == 1 where counts were 0 to prevent dividing by 0
obj.m[ m ] = 1
# our various add_y() functions have made sure that no NaN/Inf exist in the data
# so we don't have to blank anything; 's already done
# compute average y
obj.y = fn(obj.y, obj.m)
# replace the counts by the mask
obj.m = m
# and set masked values to NaN because averaging no values has no answer
obj.y[m] = numpy.nan
# and indicate we did do the averaging
obj.af = None
def __init__(self):
self.x = self.y = self.m = None
self.sf = dataset_chan.init_sumy
self.af = dataset_chan.average_empty
def append(self, xv, yv, m):
raise NotImplemented("dataset_chan not meant for appending")
def add_y(self, xv, yv, m):
if not IsArray(xv):
raise RuntimeError("dataset_chan:add_y() adding xv of non-array type! xv = {0}".format(xv))
self.sf(self, xv, yv, m)
def average(self, method):
if self.af is not None:
self.af(self, method)
else:
raise RuntimeError("Calling .average() > once on dataset_chan object?!")
def is_numarray(self):
return type(self.x) is _ArrayT and type(self.y) is _MArrayT
def as_numarray(self):
if self.is_numarray():
return self
if self.af is not None:
raise RuntimeError("Request to convert unaveraged dataset_chan_solint to nd-array?!!")
# note to self: float32 has insufficient precision for e.g.
# <quantity> versus time
self.x = ARRAY(self.x, dtype=numpy.float64)
self.y = MARRAY(self.y, mask=self.m)
return self
def __str__(self):
return "DATASET<chan>: len(x)={0}, len(y)={1} len(m)={2}".format(len(self.x), len(self.y), len(self.m))
def __repr__(self):
return str(self)
#################################################################################
#
# specialization of dataset for grouping multiple channels w/ mask by x value
# (think solint/group by time interval)
#
# the .append() accumulates the channels grouped by the x value
# .average() computes the channel averages over all data collected for each x value
#
#################################################################################
class dataset_solint_array:
__slots__ = ['x', 'y', 'a', 'd', 'm']
def __init__(self):
self.x = self.y = None
self.a = None
self.d = collections.defaultdict(int)
self.m = collections.defaultdict(int)
# this specialization assumes yv, m are instances of numpy.ndarray or numpy.ma.core.MaskedArray
def append(self, xv, yv, m):
# masked data shall not count towards the total for computing the average
#yv[m] = 0
# Also: any floating point aggregation function (sum, mean, etc.) will barf on
# any of the aggregated values being [+-]Inf or NaN - i.e. the net result
# will be NaN/Inf whatever. Therefore we must replace these with 0.
# We'll put back NaN if it turns out that no unmasked values were averaged because
# in such a situation there IS no average/sum/etc. ("what is the sum of no values?"):
# >>> import numpy
# >>> numpy.sum([1,2,numpy.nan])
# nan
yv[ LOGICAL_OR(~ISFINITE(yv), m) ] = 0
# accumulate in the bin for this specific x value
self.d[xv] = numpy.add(yv, self.d[xv])
self.m[xv] = numpy.add(~m, self.m[xv])
def average(self, method):
if self.a is not None:
return
# normal average = arithmetic mean i.e. summed value / count of valid values by default
fn = numpy.divide
if method==AVG.Vectornorm:
# for vector norm we divide by the largest complex amplitude
fn = lambda x, _: x/numpy.max(numpy.abs(x))
elif method in [AVG.NoAveraging, AVG.Sum, AVG.Vectorsum]:
# because we already integrate (==sum) then no averaging equals summing and v.v. :-)
fn = lambda x, _: x
# construct a new dict with the averaged data values and set mask wether
# any unmasked values were collected for that x, channel
self.a = dict()
while self.d:
(x, ys) = self.d.popitem()
counts = self.m.pop(x)
# ---- latest ---------------
counts = ARRAY(counts)
mask = ARRAY(counts==0, dtype=numpy.bool)
counts[mask] = 1
data = fn(ARRAY(ys), counts)
# after averaging, points with zero counts should be set to NaN
# to effectively remove them.
data[mask] = numpy.nan
self.a[x] = MARRAY(data, mask=mask)
# ---------------------------
def is_numarray(self):
return type(self.x) is _ArrayT and type(self.y) is _MArrayT
def as_numarray(self):
if self.is_numarray():
return self
# note to self: float32 has insufficient precision for e.g.
# the <time> axis in <quantity> versus time datasets
if self.a is None:
raise RuntimeError("solint dataset has not been averaged yet")
self.x = numpy.fromiter(self.a.iterkeys(), dtype=numpy.float64, count=len(self.a))
self.y = MARRAY(self.a.values())
return self
def __str__(self):
return "DATASET<solint-array>: len(d)={0} len(m)={1}".format(len(self.d), len(self.m))
def __repr__(self):
return str(self)
##################################################################################
##
## specialization of dataset for grouping a single channels w/ mask by x value
## (think solint/group by time interval)
##
## the .append() accumulates the values grouped by the x value
## .average() computes the value averages over all data collected for each x value
##
##################################################################################
class dataset_solint_scalar:
__slots__ = ['x', 'y', 'a', 'd', 'm']
def __init__(self):
self.x = self.y = None
self.a = None
self.d = collections.defaultdict(list)
self.m = collections.defaultdict(int)
# this specialization assumes yv, m are scalar value + boolean (or anything
# indicating the truth of yv)
def append(self, xv, yv, m):
# don't let masked or Inf/NaN values count towards the total before averaging
self.d[xv].append( 0 if m or not ISFINITE(yv) else yv )
# do count truth values
self.m[xv] += 0 if m else 1
def average(self, method):
if self.a is not None:
return
# normal average = arithmetic mean i.e. summed value / count of valid values
fn = operator.truediv
if method==AVG.Vectornorm:
# for vector norm we divide by the largest complex amplitude
fn = lambda x, _: x/max(map(abs,x))
elif method in [AVG.NoAveraging, AVG.Sum, AVG.Vectorsum]:
# because our data is already summed then no averaging == summing
fn = lambda x, _: x
# construct a new dict with the averaged data values and set mask based on
# number of unmasked
self.a = dict()
while self.d:
(x, ys) = self.d.popitem()
counts = self.m.pop(x)
# ---- latest ---------------
# if no valid data at all substitute a value of nan
self.a[x] = fn(sum(ys), counts) if counts else numpy.nan
# ---------------------------
def is_numarray(self):
return type(self.x) is _ArrayT and type(self.y) is _MArrayT
def as_numarray(self):
if self.is_numarray():
return self
# note to self: float32 has insufficient precision for e.g.
# the <time> axis in <quantity> versus time datasets
if self.a is None:
raise RuntimeError("solint dataset has not been averaged yet")
self.x = numpy.fromiter(self.a.iterkeys(), dtype=numpy.float64, count=len(self.a))
self.y = MARRAY(self.a.values())
return self
def __str__(self):
return "DATASET<solint-scalar>: len(d)={0} len(m)={1}".format(len(self.d), len(self.m))
def __repr__(self):
return str(self)
## Partition a data set into two separate data sets,
## one with those elements satisfying the predicate,
## the other those who dont.
## Returns (ds_true, ds_false)
##
## Implementation note:
## Yes, there is hvutil.partition() which does much the same but
## using a reduce(). The problem is that it expects a single list of values
## to which to apply the predicate.
## In order to turn a dataset() into a single list, we'd have to
## zip() the ".x" and ".y" lists. After having partition'ed the list,
## we'd have to unzip them again into separate ".x" and ".y" arrays,
## for the benefit of PGPLOT.
## Summarizing: in order to use hvutil.partition() we'd have to do two (2)
## cornerturning operations, which seems to be wasteful.
class partitioner:
def __init__(self, expr):
# solution found in:
# http://stackoverflow.com/questions/10303248/true-dynamic-and-anonymous-functions-possible-in-python
self.code = compile(
"from numpy import *\n"+
"from math import *\n"+
"avg = None\n"+
"sd = None\n"+
"xmin = None\n"+
"xmax = None\n"+
"ymin = None\n"+
"ymax = None\n"+
"f = lambda x, y: "+expr,
'dyn-mark-string', 'exec')
self.mod = imp.new_module("dyn_marker_mod")
exec(self.code, self.mod.__dict__)