-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathxmp_functions.py
484 lines (430 loc) · 22.2 KB
/
xmp_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
# Functions for Python-based XMP Tool
#
# Author: Peter Jakubowski
# Date: 11/18/2023
# Description: Python functions to work with
# XMP metadata using Python XMP Toolkit.
#
# Import necessary packages
from libxmp import XMPFiles, XMPMeta
from libxmp.utils import object_to_dict
import json
import pandas as pd
import os
import glob
from schemas import *
class Asset:
"""
Structure to store information about image files
"""
def __init__(self, path):
self.path = path # path to image file
self.filename = self.path.split("/")[-1] # the file's filename
self.flickr_id = None # 10 digit flickr id as string
self.xmp = None # xmp metadata packet
self.xmp_dict = {} # xmp metadata in a standard Python dictionary
self.flickr_data = {} # flickr annotations in a standard Python dictionary
self.xmp_updates = 0 # number of updates make to xmp packet
def get_flickr_id(self):
"""
Extracts the flickr id from the filename
"""
# split the filename into a list
id_options = self.filename.strip("_o.jpg").split("_")
# the flickr id should only contain integer digits, no characters from the alphabet.
# the number representing the unicode code of a specified character
# should be between 48-57 to be considered an integer.
# let's assume the first option we look at is true, until we find otherwise
found = True
for d in id_options[-1]:
if not 48 <= ord(d) <= 57:
found = False
# if the first id option meets the requirements, set the flick id to this option
if found:
self.flickr_id = id_options[-1]
# if the first id option does not meet the requirements, let's look at the next
# option, as long as there is one to look at
elif len(id_options) > 1:
# reset found to True
found = True
for d in id_options[-2]:
if not 48 <= ord(d) <= 57:
found = False
# if the second option meets the requirements, set the flickr id to this option
if found:
self.flickr_id = id_options[-2]
return
def get_xmp_packet(self):
"""
Opens a file and gets the xmp metadata packet
"""
# open the file
xmp_file = XMPFiles(file_path=self.path, open_forupdate=False)
# get the xmp packet from the file
xmp = xmp_file.get_xmp()
# save the xmp packet
self.xmp = xmp
return
def get_xmp_dict(self):
"""
Extracts all XMP data from a given XMPMeta instance organizing it into a
standard Python dictionary.
"""
if self.xmp is not None:
self.xmp_dict = object_to_dict(self.xmp)
return
def replace_xmp_packet(self):
"""
Replaces the xmp packet in the file with the xmp packet that is
already open, if such a packet exists.
"""
if self.xmp is not None and self.xmp_updates > 0:
# open the file for updating the xmp packet
xmp_file = XMPFiles(file_path=self.path, open_forupdate=True)
# check if we can put our xmp packet in the file
if xmp_file.can_put_xmp(self.xmp):
# put our xmp packet in the file
xmp_file.put_xmp(self.xmp)
# close the file
xmp_file.close_file()
else:
print(f'xmp packet can NOT be put in {self.filename} file!')
else:
print(f'No xmp or updates to save to {self.filename} file!')
return
def retrieve_flickr_json(self, path):
"""
Opens a json file containing flickr annotations for the corresponding image and
organizes it into a standard Python dictionary.
:param path: path to directory where json files are saved
"""
# check that the path is valid
if os.path.exists(path):
if path[-1] != "/":
path = path + "/"
# construct the name of the json file
json_file = path + "photo_" + self.flickr_id + ".json"
# check that the file exists
if os.path.exists(json_file):
# print("loading json file data...")
j = open(json_file)
self.flickr_data = json.load(j)
return
def merge_flickr_data(self):
"""
Merges flickr annotations with the file's xmp metadata.
Uses select fields from provided schema. Uses FLICKR_SCHEMA,
a dictionary of flickr annotations and xmp metadata mappings
where the key is the name of the flickr field and the value is a tuple
with the xmp namespace in the first position, the property name in the
second position, and the value form of the property
(simple, unordered, ordered, alternative).
"""
# iterate over the schema dictionary
for key, value in FLICKR_SCHEMA.items():
# unpack the xmp namespace, property, and value form from value tuple
ns, prop, value_form = value
# check that the key exists in dictionary
if key in self.flickr_data:
# load the flickr data
fd = self.flickr_data[key]
# check that the data is not any empty string or list
if fd != "" and fd != []:
# tags are provided in a list of dictionaries in the flickr json,
# convert tags to a list of tag values.
if key == 'tags':
fd = set([d['tag'] for d in fd])
# check if the property exists in the xmp metadata
if self.xmp.does_property_exist(schema_ns=ns, prop_name=prop):
# check if the property is an array
if value_form == 'unordered' or value_form == 'ordered':
# let's look at the items that are already in the array,
# if we find items that are in the flickr data, let's remove
# the value from the flickr data, so it is not duplicated in
# the xmp metadata.
# count the number of items in the array
k = self.xmp.count_array_items(schema_ns=ns, array_name=prop)
# iterate over the array items:
for i in range(k):
val = self.xmp.get_array_item(schema_ns=ns, array_prop_name=prop, index=i + 1)
# check if the item value is in the flickr data
if val in fd:
fd.remove(val)
# append the items in flickr data to xmp array
for item in fd:
# make sure the item is not an empty string
if item != "":
self.xmp.append_array_item(schema_ns=ns,
array_name=prop,
item_value=item,
array_options=VALUE_FORMS[value_form])
self.xmp_updates += 1
elif value_form == 'alternative':
# since we know the property exists, we should consider the current value
# and decide whether we want to replace or concatenate values
# retrieve the value currently in xmp metadata
val = self.xmp.get_localized_text(schema_ns=ns,
alt_text_name=prop,
generic_lang='en',
specific_lang='en-US')
# check if the current xmp value is the same as flickr data value
if val != fd:
# for now, let's replace the xmp value with the flickr data.
# set the localized text
self.xmp.set_localized_text(schema_ns=ns,
alt_text_name=prop,
generic_lang='en',
specific_lang='x-default',
prop_value=fd)
self.xmp_updates += 1
else:
# the property is not an array, so we'll treat it as a string.
# there is already a string value present in the xmp metadata,
# if it's the same value as the flickr data, then we don't need
# to do anything. if the values are not the same, we'll need to
# decide what to do, replace with the flickr data or concatenate the two.
# retrieve the value currently in xmp metadata
val = self.xmp.get_property(schema_ns=ns, prop_name=prop)
# check if the current xmp value is the same as flickr data value
if val != fd:
# for now, let's replace the xmp value with the flickr data
self.xmp.set_property(schema_ns=ns, prop_name=prop, prop_value=str(fd))
self.xmp_updates += 1
else:
# print(f"{prop} xmp property does not exist")
# the xmp property does not exist in the xmp packet, we must create it
# if the property is an array, we can append to create the property
if value_form == 'unordered' or value_form == 'ordered':
# iterate over the items to append in xmp data
for item in fd:
# make sure the item is not an empty string
if item != "":
# append item in xmp array
self.xmp.append_array_item(schema_ns=ns,
array_name=prop,
item_value=item,
array_options=VALUE_FORMS[value_form]
)
self.xmp_updates += 1
elif value_form == 'alternative':
# set the localized text
self.xmp.set_localized_text(schema_ns=ns,
alt_text_name=prop,
generic_lang='en',
specific_lang='x-default',
prop_value=fd)
self.xmp_updates += 1
else:
# if the property is not an array, we can set the property value
self.xmp.set_property(schema_ns=ns, prop_name=prop, prop_value=fd)
self.xmp_updates += 1
else:
print(f"FLICKR_SCHEMA key: {key} does not exist in flickr data!")
return
def merge_csv_data(self, metadata):
"""
Merges metadata provided in a csv file with the file's xmp metadata.
:param metadata: Dictionary, metadata from csv file where keys are the
XMP prefix and property name formatted like "prefix:property"
"""
# iterate over the items in dictionary
for key, value in metadata.items():
# check that the data is not an empty string
if value != "":
# split the key into xmp prefix and property values
key = key.split(":")
# check that there are two items in key
if len(key) == 2:
# unpack key and assign prefix and property variables
prefix, prop = key
# check that the xmp prefix and property are included in the schema
if prefix in SCHEMA and prop in SCHEMA[prefix]:
value_form = SCHEMA[prefix][prop]
# get the xmp namespace uri for prefix
ns = XMPMeta.get_namespace_for_prefix(prefix)
# check if the xmp property exists in asset's xmp packet
prop_exists = self.xmp.does_property_exist(schema_ns=ns, prop_name=prop)
# check if the property value form is a string
if value_form == "simple":
# if the xmp property doesn't exist or the values don't match, update the property
if (not prop_exists or
self.xmp.get_property(schema_ns=ns, prop_name=prop) != value):
# update the value of the property
self.xmp.set_property(schema_ns=ns, prop_name=prop, prop_value=value)
self.xmp_updates += 1
# check if the property value form is an array, check if values need updating
elif value_form == "ordered" or value_form == "unordered":
# convert the value to a list of values
value = [v.strip(" ") for v in value.split(",")]
# check if the xmp property exists
if prop_exists:
# delete the property and create a new array
self.xmp.delete_property(schema_ns=ns, prop_name=prop)
# iterate over the list of new values and append to array
for val in value:
self.xmp.append_array_item(schema_ns=ns,
array_name=prop,
item_value=val,
array_options=VALUE_FORMS[value_form])
self.xmp_updates += 1
# check if the property value form is an alternative array
elif value_form == "alternative":
# if the xmp property doesn't exist or the values don't match, update the property
if (not prop_exists or
self.xmp.get_localized_text(schema_ns=ns,
alt_text_name=prop,
generic_lang='en',
specific_lang='x-default') != value):
# set the localized text
self.xmp.set_localized_text(schema_ns=ns,
alt_text_name=prop,
generic_lang='en',
specific_lang='x-default',
prop_value=value)
self.xmp_updates += 1
else:
print((f"XMP property with prefix '{prefix}' "
f"and property name '{prop}' is not defined in schema."))
else:
print("key is not valid or formatted incorrectly")
return
def retrieve_xmp_metadata(self):
"""
Retrieves metadata values from xmp packet. Utilizes the SCHEMA dictionary
of xmp prefixes and property names to extract specific values.
:return: list of metadata values from properties defined in SCHEMA
"""
# initialize a list to keep metadata
metadata = [self.filename]
# iterate of the items in the schema dictionary
for prefix, properties in SCHEMA.items():
# get the xmp namespace uri for prefix
ns = XMPMeta.get_namespace_for_prefix(prefix)
# iterate over the properties in properties
for prop, value_form in properties.items():
# initialize an empty string to keep the property value
val = ""
# check if there is a xmp packet
if self.xmp is not None:
# check if the xmp property exists in the xmp packet
if self.xmp.does_property_exist(schema_ns=ns, prop_name=prop):
# check if the xmp property is an array
if value_form == 'unordered' or value_form == 'ordered':
# initialize empty array to store xmp array items
arr = []
# count the number of items in the array
k = self.xmp.count_array_items(schema_ns=ns, array_name=prop)
# iterate over the array items:
for i in range(k):
arr.append(self.xmp.get_array_item(schema_ns=ns, array_prop_name=prop, index=i + 1))
# convert the array to a string
val = ", ".join(arr)
elif value_form == 'alternative':
# retrieve the value currently in xmp metadata
val = self.xmp.get_localized_text(schema_ns=ns,
alt_text_name=prop,
generic_lang='en',
specific_lang='en-US')
else:
# the property is a string, append its value to the list
val = self.xmp.get_property(schema_ns=ns, prop_name=prop)
else:
print(f"{self.filename} does not contain an xmp packet")
# append the xmp property value to the list of metadata values
metadata.append(val)
return metadata
def __str__(self):
# format a string with image attributes
return f"Filename: {self.filename}\nPath: {self.path}\nFlickr photo id: {self.flickr_id}"
def get_file_paths(path):
"""
Function takes in a path to a directory or file
and returns a list of files
:param path: String, path to directory or file
:return: list of files
"""
# keep a list of files
lst = []
# first we should check if the path is a valid path
if os.path.exists(path):
# let's check if the path is to a directory
if os.path.isdir(path):
# print('path is to a directory')
# get all the paths to jpg images and add them to the list
for file_path in glob.glob(path + "/*.jpg"):
lst.append(file_path)
# let's check if the path is to a file
elif os.path.isfile(path):
# print('path is to a file')
# append file path to the list
lst.append(path)
else:
print('path is not to a directory or a file')
# if the path is not a valid path, we should let the user know
else:
print('the path is not a valid path')
return lst
def make_file_objects(files):
"""
Function takes in a list of file paths
and returns a list of objects of class Asset
:param files: list of file paths
:return: list of objects of class Asset
"""
# keep a list of objects of class Asset
lst = []
# iterate over the paths in files
for path in files:
# create an object of class Asset and append to list
lst.append(Asset(path))
return lst
def load_files(path):
"""
Wrapper function to load files from a path
and return a list of objects
:param path: path to file or directory
:return: list of objects
"""
# get a list of files from path
files = get_file_paths(path)
# convert the list of file paths to a list of objects
file_objects = make_file_objects(files)
return file_objects
def load_csv(path):
"""
Loads a csv file from a path provided in arguments
:param path: String, path to csv file
:return: csv data as a dictionary
"""
# create an empty dictionary
df = {}
# check if the path is valid
if os.path.exists(path) and os.path.isfile(path):
# load the csv file using pandas
df = pd.read_csv(path, index_col=False, dtype=str)
# set the index to the filename column
df = df.set_index(keys='filename', drop=True)
# fill missing values with empty strings
df = df.fillna("")
# convert the dataframe to a dictionary
df = df.to_dict(orient='index')
else:
print("path to csv file is not valid!")
return df
def create_data_frame(data):
"""
Sets up a new pandas dataframe with retrieved xmp metadata
and columns named after xmp properties provided in SCHEMA dictionary.
:param data: retrieved xmp metadata
:return: empty pandas dataframe with schema property names
for column names
"""
# initialize an empty list for column names
columns = ["filename"]
# iterate over the lists of property names
for prefix, properties in SCHEMA.items():
# concatenate column names with property names
columns = columns + [prefix + ":" + prop for prop, _ in properties.items()]
# create pandas dataframe with schema properties as columns
df = pd.DataFrame(data=data, columns=columns)
return df