-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapp.py
211 lines (166 loc) · 7.44 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
from init_photo_service import service
import multiprocessing
import os
import json
from datetime import datetime
import requests
from pathlib import Path
import filedate
import pickle
from concurrent.futures import ThreadPoolExecutor
class GooglePhotosDownloader:
def __init__(self):
self.base_path = "photos"
self.metadata_path = os.path.join(self.base_path, "metadata")
self.cache_file = os.path.join(self.base_path, "download_cache.pickle")
Path(self.base_path).mkdir(exist_ok=True)
Path(self.metadata_path).mkdir(exist_ok=True)
self.months = {
1: "01-Jan", 2: "02-Feb", 3: "03-Mar",
4: "04-Apr", 5: "05-May", 6: "06-Jun",
7: "07-July", 8: "08-Aug", 9: "09-Sept",
10: "10-Oct", 11: "11-Nov", 12: "12-Dec"
}
def fetch_and_save_metadata(self):
"""Fetch all media items and save their metadata"""
items = []
page_token = None
while True:
request_body = {"pageSize": 100}
if page_token:
request_body["pageToken"] = page_token
response = service.mediaItems().list(**request_body).execute()
current_items = response.get("mediaItems", [])
items.extend(current_items)
# Save metadata for current batch
for item in current_items:
self._save_item_metadata(item)
page_token = response.get("nextPageToken")
if not page_token:
break
print(f"Fetched and saved metadata for {len(items)} items...")
# Save all items data for later downloading
with open(self.cache_file, 'wb') as f:
pickle.dump(items, f)
return len(items)
def _save_item_metadata(self, item):
"""Save metadata for a single item"""
creation_time_str = item["mediaMetadata"]["creationTime"]
try:
creation_time = datetime.strptime(creation_time_str, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
creation_time = datetime.strptime(creation_time_str, "%Y-%m-%dT%H:%M:%SZ")
year = str(creation_time.year)
month = self.months[creation_time.month]
# Create folder structure
year_path = os.path.join(self.base_path, year)
month_path = os.path.join(year_path, month)
Path(year_path).mkdir(exist_ok=True)
Path(month_path).mkdir(exist_ok=True)
# Save folder metadata for tracking downloads
folder_metadata = {
"year": year,
"month": month,
"item_count": 0,
"total_size": 0
}
with open(os.path.join(month_path, "folder_info.json"), "w") as f:
json.dump(folder_metadata, f, indent=2)
# Save item metadata
metadata = {
"id": item["id"],
"description": item.get("description", ""),
"creationTime": item["mediaMetadata"]["creationTime"],
"modificationTime": item["mediaMetadata"].get("modificationTime",None),
"width": item["mediaMetadata"].get("width", ""),
"height": item["mediaMetadata"].get("height", ""),
"mimeType": item["mimeType"],
"baseUrl": item["baseUrl"],
"filename": item["filename"]
}
metadata_file = os.path.join(self.metadata_path, f"{item['id']}.json")
with open(metadata_file, "w") as f:
json.dump(metadata, f, indent=2)
def download_all_media(self, max_threads=4, use_threading=True):
"""Download all media files using saved metadata"""
if not os.path.exists(self.cache_file):
print("No cached metadata found. Run fetch_and_save_metadata first.")
return
with open(self.cache_file, 'rb') as f:
items = pickle.load(f)
total = len(items)
completed = 0
def download_with_progress(item):
nonlocal completed
try:
self._download_single_item(item)
completed += 1
print(f"Progress: {completed}/{total} items processed")
except Exception as e:
print(f"Error downloading {item.get('filename', 'unknown')}: {str(e)}")
if use_threading:
with ThreadPoolExecutor(max_workers=max_threads) as executor:
executor.map(download_with_progress, items)
else:
for item in items:
download_with_progress(item)
def _download_single_item(self, item):
"""Download a single media item"""
timestamp_str = item["mediaMetadata"]["creationTime"]
timestamp_str = timestamp_str.replace('Z', '').split('.')[0]
creation_time = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S")
# Get ordinal day
day_ordinal = str(creation_time.day)
if day_ordinal.endswith('1') and day_ordinal != '11':
day_ordinal += 'st'
elif day_ordinal.endswith('2') and day_ordinal != '12':
day_ordinal += 'nd'
elif day_ordinal.endswith('3') and day_ordinal != '13':
day_ordinal += 'rd'
else:
day_ordinal += 'th'
year = str(creation_time.year)
month = self.months[creation_time.month]
month_path = os.path.join(self.base_path, year, month)
friendly_time = creation_time.strftime("%I-%M %p").lstrip('0')
friendly_date = f"{day_ordinal} {month[3:]} {year} at {friendly_time}"
filename = f"{friendly_date}_{item['filename']}"
file_path = os.path.join(month_path, filename)
if os.path.exists(file_path):
print(f"Skipping existing file: {filename}")
return
base_url = item["baseUrl"]
mime_type = item["mimeType"]
download_url = f"{base_url}=d" if "image" in mime_type else f"{base_url}=dv"
# print(f"Download URL: {download_url}")
print(f"Downloading: {filename}")
response = requests.get(download_url)
if response.status_code == 200:
with open(file_path, "wb") as f:
f.write(response.content)
# Update folder metadata
folder_info_path = os.path.join(month_path, "folder_info.json")
with open(folder_info_path, "r") as f:
folder_info = json.load(f)
folder_info["item_count"] += 1
folder_info["total_size"] += len(response.content)
with open(folder_info_path, "w") as f:
json.dump(folder_info, f, indent=2)
filedate.File(file_path).set(
created = creation_time,
modified = creation_time,
)
else:
print(f"Failed to download {filename}")
def main():
downloader = GooglePhotosDownloader()
cpu_count = multiprocessing.cpu_count()
recommended_threads = max(1, cpu_count // 2)
print("Step 1: Fetching and saving metadata...")
total_items = downloader.fetch_and_save_metadata()
print(f"Metadata saved for {total_items} items")
print(f"\nStep 2: Downloading media files using {recommended_threads} threads...")
downloader.download_all_media(max_threads=recommended_threads)
print("Download complete!")
if __name__ == "__main__":
main()