-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathPT_classifier.py
executable file
·423 lines (329 loc) · 16.4 KB
/
PT_classifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
import re
import common_utils
from operator import add
from functools import reduce
from collections import defaultdict
import abc
import logging
from dotty_dict import Dotty
log = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Apply patch to Dotty.get() method
# -----------------------------------------------------------------------------
def patch_get(self, key, default=None):
"""Get value from deep key or default if key does not exist.
This method match 1:1 with dict .get method except that it
accepts deeply nested key with dot notation.
:param str key: Single key or chain of keys
:param Any default: Default value if deep key does not exist
:return: Any or default value
"""
try:
return self.__getitem__(key)
except (KeyError, IndexError):
return default
Dotty.get = patch_get
# -----------------------------------------------------------------------------
# Define module dictionaries
# -----------------------------------------------------------------------------
# http://dicom.nema.org/medical/dicom/2015c/output/chtml/part16/sect_CID_4021.html
TRACER_CODES = {
'C-B1031': 'FDG', #Fluorodeoxyglucose
'126501': 'FBB', #Florbetaben
'C-E0269': 'FBP', #Florbetapir = AV45
'C-E0267': 'FMM', #Flutemetamol
'C1831937': 'FES', #Fluoroestradiol
'126500': 'PiB' #Pittsburgh compound B
}
TRACER_MEANINGS = {
# first the allowed values of CodeMeaning for tracer, i.e., (0008,0104)
'Fluorodeoxyglucose F^18^': 'FDG',
'Florbetaben F^18^': 'FBB',
'Florbetapir F^18^': 'FBP',
'Flutemetamol F^18^': 'FMM',
'Fluoroestradiol (FES) F^18^': 'FES',
'Pittsburgh compound B C^11^': 'PiB',
# then values found for Radiopharmaceutical
#not sure if this can work, Radiopharmaceutical may be free-form text ?
'FDG -- fluorodeoxyglucose': 'FDG',
'Fluorodeoxyglucose': 'FDG',
'HMDP': 'HMDP',
}
TRACER_TO_ISOTOPE = { # For some tracers, the tracer dictates the isotope
'FDG': 'F18',
'FBB': 'F18',
'FBP': 'F18',
'FMM': 'F18',
'FES': 'F18',
'PiB': 'C11'
}
#http://dicom.nema.org/medical/dicom/2015c/output/chtml/part16/sect_CID_4020.html
ISOTOPE_CODES = {
'C-111A1': 'F18',
'C-105A1': 'C11',
'C-168A4': 'Zr89',
'C-163A8': 'T99m'
}
ISOTOPE_MEANINGS = {
'^18^Fluorine': 'F18',
'99m Technetium': 'T99m'
}
class PTSubClassifier(abc.ABC):
"""
An abstract base class that's the sub-component in the composite design
pattern. Currently, this sub-component is used to define only leaves.
The composite of its leaves is defined as a concrete implementation of
the parent (abstract) component.
All leaves will define the method 'classify', which returns
classifications and info_object parameters.
"""
def __init__(self, header_dicom: dict, acquisition):
"""
Args:
header_dicom (dict): This is just the dicom header info similar to file.info['header']['dicom'].
acquisition (flywheel.Acquisition): A flywheel acquisition container object
"""
self.header_dicom = Dotty(header_dicom)
self.acquisition = acquisition
self.label = acquisition.label
@abc.abstractmethod
def classify(self, classifications, info_object):
"""Returns updated classifications and info_object
Args:
classifications (dict): A dictionary matching flywheel modality specific classification. Note the
classification for a modality can be fetched with `fw.get_modality('PT')['classification']`
for a PT modality for instance.
info_object (dict): Info dictionary attribute of a file object.
"""
raise NotImplemented
def get_dicom_tag(self, dotty_key: str):
"""Returns the value of single_header_object at dotty_key location.
Args:
dotty_key (str): A string to reference the location of the targeted value
(e.g. 'RadiopharmaceuticalInformationSequence.0.RadionuclideCodeSequence.0.CodeValue')
"""
return self.header_dicom.get(dotty_key)
@staticmethod
def warn_if_isotope_different_from_previously_found(
isotope, classification):
if classification['Isotope']:
if isotope not in classification['Isotope'] and (isotope is not None):
log.warning(f'Isotope from CodeMeaning ({isotope}) is different from the one previously found '
f'({classification["Isotope"]})')
class IsotopePTSubClassifier(PTSubClassifier):
def classify(self, classifications, info_object):
"""Returns updated classifications and info_object
Args:
classifications (dict): A dictionary matching flywheel modality specific classification. Note the
classification for a modality can be fetched with `fw.get_modality('PT')['classification']`
for a PT modality for instance.
info_object (dict): Info dictionary attribute of a file object.
"""
# Classify isotopes
classifications, info_object = self.classify_based_on_isotope_code(classifications, info_object)
classifications, info_object = self.classify_based_on_isotope_meaning(classifications, info_object)
return classifications, info_object
def classify_based_on_isotope_code(self, classification, info_object):
"""Returns updated classifications and info_object with Isotope Code info."""
isotope = None
code_value_isotope = self.get_dicom_tag(
'RadiopharmaceuticalInformationSequence.0.RadionuclideCodeSequence.0.CodeValue')
if code_value_isotope in ISOTOPE_CODES:
isotope = ISOTOPE_CODES[code_value_isotope]
self.warn_if_isotope_different_from_previously_found(
isotope=isotope, classification=classification)
if isotope and not classification['Isotope']:
classification['Isotope'].append(isotope)
return classification, info_object
def classify_based_on_isotope_meaning(self, classification, info_object):
"""Returns updated classifications and info_object with Isotope Meaning info."""
isotope = None
lc_kw = {k.lower(): v for k, v in ISOTOPE_MEANINGS.items()}
code_meaning_isotope = self.get_dicom_tag(
'RadiopharmaceuticalInformationSequence.0.RadionuclideCodeSequence.0.CodeMeaning')
if code_meaning_isotope and code_meaning_isotope.lower() in lc_kw:
isotope = lc_kw[code_meaning_isotope.lower()]
self.warn_if_isotope_different_from_previously_found(
isotope=isotope, classification=classification)
if isotope and not classification['Isotope']:
classification['Isotope'].append(isotope)
return classification, info_object
class ProcessingPTSubClassifier(PTSubClassifier):
def classify(self, classification, info_object):
"""Returns updated classification and info_object
Args:
classification (dict): A dictionary matching flywheel modality specific classification. Note the
classification for a modality can be fetched with `fw.get_modality('PT')['classification']`
for a PT modality for instance.
info_object (dict): Info dictionary attribute of a file object.
"""
classification, info_object = self.classify_attenuation_corrected(classification, info_object)
return classification, info_object
def classify_attenuation_corrected(self, classification, info_object):
"""Returns updated classification and info_object with Processing info"""
processing_ac = None
ac_method = self.get_dicom_tag('AttenuationCorrectionMethod')
if ac_method:
processing_ac = 'Attenuation Corrected'
# classify based on 'CorrectedImage' if haven't classified
# get CorrectedImage
corrected_image = self.get_dicom_tag('CorrectedImage')
if not processing_ac:
if corrected_image:
if 'ATTN' in corrected_image:
processing_ac = 'Attenuation Corrected'
# classify based on acquisition label if haven't classified
if not processing_ac:
if self.label:
if "AC" in self.label:
processing_ac = 'Attenuation Corrected'
# append to classification if classified
if processing_ac:
classification['Processing'].append(processing_ac)
return classification, info_object
class TracerPTSubClassifier(PTSubClassifier):
def classify(self, classification, info_object):
"""Returns updated classification and info_object.
Args:
classification (dict): A dictionary matching flywheel modality specific classification. Note the
classification for a modality can be fetched with `fw.get_modality('PT')['classification']`
for a PT modality for instance.
info_object (dict): Info dictionary attribute of a file object.
"""
classification, info_object = self.classify_based_on_tracer_code(classification, info_object)
classification, info_object = \
self.classify_based_on_tracer_meaning_or_radiopharmaceutical(classification, info_object)
return classification, info_object
def classify_based_on_tracer_code(self, classification, info_object):
"""Returns updated classification and info_object with Tracer code info."""
tracer, isotope = None, None
code_value_tracer = self.get_dicom_tag(
'RadiopharmaceuticalInformationSequence.0.RadiopharmaceuticalCodeSequence.0.CodeValue')
if code_value_tracer in TRACER_CODES:
tracer = TRACER_CODES[code_value_tracer]
isotope = TRACER_TO_ISOTOPE.get(tracer)
self.warn_if_isotope_different_from_previously_found(
isotope=isotope, classification=classification)
if tracer and not classification['Tracer']:
classification['Tracer'].append(tracer)
if isotope and not classification['Isotope']:
classification['Isotope'].append(isotope)
return classification, info_object
def classify_based_on_tracer_meaning_or_radiopharmaceutical(self, classification, info_object):
"""Returns updated classification and info_object with Tracer Code Meaning info."""
tracer, isotope = None, None
lc_kw = {k.lower(): v for k, v in TRACER_MEANINGS.items()}
code_meaning_tracer = self.get_dicom_tag(
'RadiopharmaceuticalInformationSequence.0.RadiopharmaceuticalCodeSequence.0.CodeMeaning')
if code_meaning_tracer and code_meaning_tracer.lower() in lc_kw:
tracer = lc_kw[code_meaning_tracer.lower()]
isotope = TRACER_TO_ISOTOPE.get(tracer)
self.warn_if_isotope_different_from_previously_found(
isotope=isotope, classification=classification)
if tracer and not classification['Tracer']:
classification['Tracer'].append(tracer)
if isotope and not classification['Isotope']:
classification['Isotope'].append(isotope)
# check radiopharmaceutical locations, since it could vary by
# manufacturer
radiopharma = self.get_dicom_tag(
'RadiopharmaceuticalInformationSequence.0.RadionuclideCodeSequence.0.Radiopharmaceutical')
if radiopharma is None:
radiopharma = self.get_dicom_tag(
'RadiopharmaceuticalInformationSequence.0.Radiopharmaceutical')
if radiopharma and radiopharma.lower() in lc_kw:
tracer = lc_kw[radiopharma.lower()]
isotope = TRACER_TO_ISOTOPE.get(tracer)
self.warn_if_isotope_different_from_previously_found(
isotope=isotope, classification=classification)
if tracer and not classification['Tracer']:
classification['Tracer'].append(tracer)
if isotope and not classification['Isotope']:
classification['Isotope'].append(isotope)
return classification, info_object
class BaseModalityClassifier(abc.ABC):
"""Modality Classifier abstract class.
This is the main component in the composite design pattern. Concrete
implementations of this adds leaves of a sub-composite class (e.g.,
PTSubClassifier) to create a composite of those leaves (e.g.,
PTClassifier). In this way, all composites and leaves can be treated
the same way (i.e., use the same arguments and methods). Further
explanation is below.
There are two abstract base classes involved (component and
sub-component): one for the modality and the other--a sub-classifier--for
the classifications of a modality. The base modality class simply
defines which concrete sub-classifiers (or leaves of a sub-component
class) a modality will use, essentially creating a composite of those
leaves (a sub-composite in the overall scheme).
Concrete sub-classifier classes (leaves) are added to the modality's
class variable list, sub_classifiers, to create a sub-composite. When a
concrete modality class is instantiated, all concrete sub-classifiers are
appended to the modality's instance variable, self.classifiers. Calling
the instantiated modality class' self.classify() method will invoke all
sub-classifier's classify() method, which is defined
individually for each concrete sub-classifier class (since the
sub-classifier's abstract base class has an abstract self.classify()
method). The passed arguments, classification and info_object,
are updated as it passes through all sub-classifiers (i.e., updated as
they pass through all leaves of the sub-composite).
Args:
header_dicom (dict): This is just the dicom header info similar to file.info['header']['dicom'].
acquisition (flywheel.Acquisition): A flywheel acquisition container object
Attributes:
sub_classifiers (list): List of SubClassifier class that will be applied.
"""
sub_classifiers = None
def __init__(self, header_dicom, acquisition):
self.header_dicom = header_dicom
self.acquisition = acquisition
self.classifiers = []
for subclass in self.sub_classifiers:
self.classifiers.append(subclass(self.header_dicom, self.acquisition))
def classify(self, classification, info_object):
"""Returns updated classification and info_object
Args:
classification (dict): A dictionary matching flywheel modality specific classification. Note the
classification for a modality can be fetched with `fw.get_modality('PT')['classification']`
for a PT modality for instance.
info_object (dict): Info dictionary attribute of a file object.
"""
# make classification a defaultdict with default=list
classification = defaultdict(list, classification)
for classifier in self.classifiers:
classification, info_object = classifier.classify(classification, info_object)
return classification, info_object
class PTClassifier(BaseModalityClassifier):
"""The PT Classifier class"""
sub_classifiers = [
IsotopePTSubClassifier,
TracerPTSubClassifier,
ProcessingPTSubClassifier
]
def classify_PT(df, dcm_metadata, acquisition):
'''
Classifies a PT dicom series
Args:
df (DataFrame): A pandas DataFrame where each row is a dicom image header information
Returns:
dict: The dictionary for the PT classification
'''
log.info("Determining PT Classification...")
header_dicom = dcm_metadata['info']['header']['dicom']
series_description = header_dicom.get('SeriesDescription') or ''
classification = {}
info_object = {}
# Compute scan coverage
scan_coverage, info_object = \
common_utils.compute_scan_coverage_if_original(header_dicom, df,
info_object)
# Classify Anatomy
classification = common_utils.classify_anatomy(
classification, acquisition, series_description, scan_coverage)
# Classify Isotope, Processing, Tracer
pt_classifier = PTClassifier(header_dicom=header_dicom, acquisition=acquisition)
classification, info_object = pt_classifier.classify(classification, info_object)
dcm_metadata['info'].update(info_object)
dcm_metadata['classification'] = classification
return dcm_metadata