From 01e75542d78e7fbcb08cab59b84a931f1a5ad710 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Thu, 10 Oct 2024 08:31:14 -0500 Subject: [PATCH 01/41] working on better downloads --- src/burst2safe/burst2safe.py | 3 +- src/burst2safe/download.py | 106 +++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 src/burst2safe/download.py diff --git a/src/burst2safe/burst2safe.py b/src/burst2safe/burst2safe.py index ccab068..ca44999 100644 --- a/src/burst2safe/burst2safe.py +++ b/src/burst2safe/burst2safe.py @@ -8,7 +8,8 @@ from burst2safe import utils from burst2safe.safe import Safe -from burst2safe.search import download_bursts, find_bursts +from burst2safe.search import find_bursts +from burst2safe.download import download_bursts DESCRIPTION = """Convert a set of ASF burst SLCs to the ESA SAFE format. diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py new file mode 100644 index 0000000..6161817 --- /dev/null +++ b/src/burst2safe/download.py @@ -0,0 +1,106 @@ +from concurrent.futures import ThreadPoolExecutor +from multiprocessing import cpu_count +from pathlib import Path +from typing import Iterable, Optional + +import asf_search +import requests +from tenacity import retry, retry_if_result, stop_after_delay, wait_fixed, wait_random +from tqdm.contrib.concurrent import process_map + +from burst2safe.auth import get_earthdata_credentials +from burst2safe.utils import BurstInfo + + +def try_get_response(username: str, password: str, url: str) -> requests.Response: + session = asf_search.ASFSession().auth_with_creds(username, password) + response = session.get(url, stream=True, hooks={'response': asf_search.download.strip_auth_if_aws}) + + try: + response.raise_for_status() + except requests.exceptions.HTTPError as e: + if 400 <= response.status_code <= 499: + raise asf_search.execptions.ASFAuthenticationError(f'HTTP {e.response.status_code}: {e.response.text}') + raise e + + return response + + +@retry( + reraise=True, + retry=retry_if_result(lambda r: r.status_code == 202), + wait=wait_fixed(0.5) + wait_random(0, 1), + stop=stop_after_delay(120), +) +def retry_get_response(username: str, password: str, url: str) -> requests.Response: + response = try_get_response(username, password, url) + return response + + +def download_url(username: str, password: str, url: str, file_path: Path) -> None: + """ + Downloads a product from the specified URL to the specified file path. + + Args: + url: The URL to download + file_path: The path to save the file + username: The username to use for the download + password: The password to use for the download + """ + if not file_path.parent.exists(): + raise asf_search.exceptions.ASFDownloadError( + f'Error downloading {url}: directory not found: {file_path.parent}' + ) + + response = retry_get_response(username=username, password=password, url=url) + + with open(file_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + +def download_bursts(burst_infos: Iterable[BurstInfo], n_threads: Optional[int] = None, force: bool = False) -> None: + """Download the burst data and metadata files using multiple workers. + + Args: + burst_infos: A list of BurstInfo objects + n_threads: Number of threads to use for downloading + force: If True, download the files even if they already exist + """ + tiffs = {} + xmls = {} + for burst_info in burst_infos: + if force or not burst_info.data_path.exists(): + tiffs[burst_info.data_path] = burst_info.data_url + + if force or not burst_info.metadata_path.exists(): + xmls[burst_info.metadata_path] = burst_info.metadata_url + + all_data = {**tiffs, **xmls} + if len(all_data) == 0: + print('All files already exist. Skipping download.') + return + + username, password = get_earthdata_credentials() + n_threads = min(len(all_data), cpu_count() + 4) if n_threads is None else n_threads + + # Submit one request per file to start extraction of burst data + username_list = [username] * len(all_data) + password_list = [password] * len(all_data) + # with ThreadPoolExecutor(max_workers=n_threads) as executor: + with ThreadPoolExecutor(max_workers=n_threads): + process_map(try_get_response, username_list, password_list, all_data.values()) + + print('Downloading metadata...') + username_list = [username] * len(xmls) + password_list = [password] * len(xmls) + with ThreadPoolExecutor(max_workers=n_threads): + process_map(download_url, username_list, password_list, xmls.values(), xmls.keys()) + # [download_url(u, p, url, file) for u, p, url, file in zip(username_list, password_list, xmls.values(), xmls.keys())] + + print('Downloading data...') + username_list = [username] * len(xmls) + password_list = [password] * len(xmls) + with ThreadPoolExecutor(max_workers=n_threads): + process_map(download_url, username_list, password_list, tiffs.values(), tiffs.keys()) + # [download_url(u, p, url, file) for u, p, url, file in zip(username_list, password_list, tiffs.values(), tiffs.keys())] From 33c453b26c9b1588e1cb7e692a4ae236ac9556bf Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Fri, 11 Oct 2024 07:55:40 -0500 Subject: [PATCH 02/41] working async --- src/burst2safe/download.py | 147 ++++++++++++++++++++++++++----------- 1 file changed, 103 insertions(+), 44 deletions(-) diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index 6161817..13ef90a 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -1,10 +1,16 @@ +import asyncio +import os from concurrent.futures import ThreadPoolExecutor from multiprocessing import cpu_count from pathlib import Path from typing import Iterable, Optional +import aiohttp import asf_search import requests +from asf_search import ASFAuthenticationError +from asf_search.ASFSession import ASFSession +from requests.exceptions import HTTPError from tenacity import retry, retry_if_result, stop_after_delay, wait_fixed, wait_random from tqdm.contrib.concurrent import process_map @@ -12,15 +18,14 @@ from burst2safe.utils import BurstInfo -def try_get_response(username: str, password: str, url: str) -> requests.Response: - session = asf_search.ASFSession().auth_with_creds(username, password) +def try_get_response(session: ASFSession, url: str): response = session.get(url, stream=True, hooks={'response': asf_search.download.strip_auth_if_aws}) try: response.raise_for_status() - except requests.exceptions.HTTPError as e: + except HTTPError as e: if 400 <= response.status_code <= 499: - raise asf_search.execptions.ASFAuthenticationError(f'HTTP {e.response.status_code}: {e.response.text}') + raise ASFAuthenticationError(f'HTTP {e.response.status_code}: {e.response.text}') raise e return response @@ -32,41 +37,32 @@ def try_get_response(username: str, password: str, url: str) -> requests.Respons wait=wait_fixed(0.5) + wait_random(0, 1), stop=stop_after_delay(120), ) -def retry_get_response(username: str, password: str, url: str) -> requests.Response: - response = try_get_response(username, password, url) - return response - +def retry_get_response(session: ASFSession, url: str) -> requests.Response: + return try_get_response(session, url) -def download_url(username: str, password: str, url: str, file_path: Path) -> None: - """ - Downloads a product from the specified URL to the specified file path. - Args: - url: The URL to download - file_path: The path to save the file - username: The username to use for the download - password: The password to use for the download - """ +def download_response(response: requests.Response, file_path: Path) -> None: if not file_path.parent.exists(): - raise asf_search.exceptions.ASFDownloadError( - f'Error downloading {url}: directory not found: {file_path.parent}' - ) - - response = retry_get_response(username=username, password=password, url=url) - + raise asf_search.exceptions.ASFDownloadError(f'Error downloading, directory not found: {file_path.parent}') with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) -def download_bursts(burst_infos: Iterable[BurstInfo], n_threads: Optional[int] = None, force: bool = False) -> None: - """Download the burst data and metadata files using multiple workers. +def download_url(session: ASFSession, url: str, file_path: Path) -> None: + """ + Downloads a product from the specified URL to the specified file path. Args: - burst_infos: A list of BurstInfo objects - n_threads: Number of threads to use for downloading - force: If True, download the files even if they already exist + session: The ASF session to use for downloading + url: The URL to download + file_path: The path to save the file """ + response = retry_get_response(session, url=url) + download_response(response, file_path) + + +def get_url_dict(burst_infos: Iterable[BurstInfo], force: bool = False) -> dict: tiffs = {} xmls = {} for burst_info in burst_infos: @@ -75,32 +71,95 @@ def download_bursts(burst_infos: Iterable[BurstInfo], n_threads: Optional[int] = if force or not burst_info.metadata_path.exists(): xmls[burst_info.metadata_path] = burst_info.metadata_url + return tiffs, xmls + +def download_bursts_thread( + burst_infos: Iterable[BurstInfo], max_threads: Optional[int] = None, force: bool = False +) -> None: + """Download the burst data and metadata files using multiple workers. + + Args: + burst_infos: A list of BurstInfo objects + n_threads: Number of threads to use for downloading + force: If True, download the files even if they already exist + """ + tiffs, xmls = get_url_dict(burst_infos, force) all_data = {**tiffs, **xmls} if len(all_data) == 0: print('All files already exist. Skipping download.') return username, password = get_earthdata_credentials() - n_threads = min(len(all_data), cpu_count() + 4) if n_threads is None else n_threads + sess = asf_search.ASFSession().auth_with_creds(username, password) + # max_threads = min(len(all_data), cpu_count() + 4) if max_threads is None else max_threads + max_threads = min(cpu_count() + 4 if max_threads is None else max_threads, len(all_data)) # Submit one request per file to start extraction of burst data - username_list = [username] * len(all_data) - password_list = [password] * len(all_data) - # with ThreadPoolExecutor(max_workers=n_threads) as executor: - with ThreadPoolExecutor(max_workers=n_threads): - process_map(try_get_response, username_list, password_list, all_data.values()) + with ThreadPoolExecutor(max_workers=max_threads): + process_map(try_get_response, [sess] * len(all_data), all_data.values()) print('Downloading metadata...') - username_list = [username] * len(xmls) - password_list = [password] * len(xmls) - with ThreadPoolExecutor(max_workers=n_threads): - process_map(download_url, username_list, password_list, xmls.values(), xmls.keys()) - # [download_url(u, p, url, file) for u, p, url, file in zip(username_list, password_list, xmls.values(), xmls.keys())] + with ThreadPoolExecutor(max_workers=min(max_threads, len(xmls))): + process_map(download_url, [sess] * len(xmls), xmls.values(), xmls.keys()) print('Downloading data...') - username_list = [username] * len(xmls) - password_list = [password] * len(xmls) - with ThreadPoolExecutor(max_workers=n_threads): - process_map(download_url, username_list, password_list, tiffs.values(), tiffs.keys()) - # [download_url(u, p, url, file) for u, p, url, file in zip(username_list, password_list, tiffs.values(), tiffs.keys())] + with ThreadPoolExecutor(max_workers=min(max_threads, len(tiffs))): + process_map(download_url, [sess] * len(tiffs), tiffs.values(), tiffs.keys()) + + +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# +@retry( + reraise=True, + retry=retry_if_result(lambda r: r.status == 202), + wait=wait_fixed(0.5) + wait_random(0, 1), + stop=stop_after_delay(120), +) +async def retry_get_response_async(session, url): + response = await session.get(url) + response.raise_for_status() + return response + + +async def download_response_async(response, file_path: Path) -> None: + if not file_path.parent.exists(): + raise ValueError(f'Error downloading, directory not found: {file_path.parent}') + + with open(file_path, 'wb') as f: + async for chunk in response.content.iter_chunked(8192): + f.write(chunk) + + +async def download_producer(url_dict, session, queue): + print('Producer: Running') + for path, url in url_dict.items(): + response = await retry_get_response_async(session, url=url) + await queue.put((response, path)) + await queue.put((None, None)) + print('Producer: Done') + + +async def download_consumer(queue): + print('Consumer: Running') + while True: + response, path = await queue.get() + if path is None: + break + print(f'Downloading {path}') + await download_response_async(response, path) + print('Consumer: Done') + + +async def download_async(url_dict, token) -> None: + queue = asyncio.Queue() + headers = {'Authorization': f'Bearer {token}'} + async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: + await asyncio.gather(download_producer(url_dict, session, queue), download_consumer(queue)) + + +def download_bursts(burst_infos: Iterable[BurstInfo]): + tiffs, xmls = get_url_dict(burst_infos) + username, password = get_earthdata_credentials() + token = os.getenv('EDL_TOKEN') + token = aiohttp.BasicAuth(username, password) + asyncio.run(download_async({**tiffs, **xmls}, token)) From 5f2fb83da8ab432d9d67315b2776cd8d040cb306 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Fri, 11 Oct 2024 07:57:22 -0500 Subject: [PATCH 03/41] remove unused code --- src/burst2safe/download.py | 89 +------------------------------------- 1 file changed, 1 insertion(+), 88 deletions(-) diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index 13ef90a..1117620 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -1,67 +1,15 @@ import asyncio import os -from concurrent.futures import ThreadPoolExecutor -from multiprocessing import cpu_count from pathlib import Path -from typing import Iterable, Optional +from typing import Iterable import aiohttp -import asf_search -import requests -from asf_search import ASFAuthenticationError -from asf_search.ASFSession import ASFSession -from requests.exceptions import HTTPError from tenacity import retry, retry_if_result, stop_after_delay, wait_fixed, wait_random -from tqdm.contrib.concurrent import process_map from burst2safe.auth import get_earthdata_credentials from burst2safe.utils import BurstInfo -def try_get_response(session: ASFSession, url: str): - response = session.get(url, stream=True, hooks={'response': asf_search.download.strip_auth_if_aws}) - - try: - response.raise_for_status() - except HTTPError as e: - if 400 <= response.status_code <= 499: - raise ASFAuthenticationError(f'HTTP {e.response.status_code}: {e.response.text}') - raise e - - return response - - -@retry( - reraise=True, - retry=retry_if_result(lambda r: r.status_code == 202), - wait=wait_fixed(0.5) + wait_random(0, 1), - stop=stop_after_delay(120), -) -def retry_get_response(session: ASFSession, url: str) -> requests.Response: - return try_get_response(session, url) - - -def download_response(response: requests.Response, file_path: Path) -> None: - if not file_path.parent.exists(): - raise asf_search.exceptions.ASFDownloadError(f'Error downloading, directory not found: {file_path.parent}') - with open(file_path, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - - -def download_url(session: ASFSession, url: str, file_path: Path) -> None: - """ - Downloads a product from the specified URL to the specified file path. - - Args: - session: The ASF session to use for downloading - url: The URL to download - file_path: The path to save the file - """ - response = retry_get_response(session, url=url) - download_response(response, file_path) - - def get_url_dict(burst_infos: Iterable[BurstInfo], force: bool = False) -> dict: tiffs = {} xmls = {} @@ -74,41 +22,6 @@ def get_url_dict(burst_infos: Iterable[BurstInfo], force: bool = False) -> dict: return tiffs, xmls -def download_bursts_thread( - burst_infos: Iterable[BurstInfo], max_threads: Optional[int] = None, force: bool = False -) -> None: - """Download the burst data and metadata files using multiple workers. - - Args: - burst_infos: A list of BurstInfo objects - n_threads: Number of threads to use for downloading - force: If True, download the files even if they already exist - """ - tiffs, xmls = get_url_dict(burst_infos, force) - all_data = {**tiffs, **xmls} - if len(all_data) == 0: - print('All files already exist. Skipping download.') - return - - username, password = get_earthdata_credentials() - sess = asf_search.ASFSession().auth_with_creds(username, password) - # max_threads = min(len(all_data), cpu_count() + 4) if max_threads is None else max_threads - max_threads = min(cpu_count() + 4 if max_threads is None else max_threads, len(all_data)) - - # Submit one request per file to start extraction of burst data - with ThreadPoolExecutor(max_workers=max_threads): - process_map(try_get_response, [sess] * len(all_data), all_data.values()) - - print('Downloading metadata...') - with ThreadPoolExecutor(max_workers=min(max_threads, len(xmls))): - process_map(download_url, [sess] * len(xmls), xmls.values(), xmls.keys()) - - print('Downloading data...') - with ThreadPoolExecutor(max_workers=min(max_threads, len(tiffs))): - process_map(download_url, [sess] * len(tiffs), tiffs.values(), tiffs.keys()) - - -# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# @retry( reraise=True, retry=retry_if_result(lambda r: r.status == 202), From de0b6b8c79e436618e8594242f201fd64ac28029 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Fri, 11 Oct 2024 13:30:18 -0500 Subject: [PATCH 04/41] working with .netrc --- src/burst2safe/download.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index 1117620..991da78 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -1,12 +1,11 @@ import asyncio -import os from pathlib import Path from typing import Iterable import aiohttp from tenacity import retry, retry_if_result, stop_after_delay, wait_fixed, wait_random +from tqdm.asyncio import tqdm -from burst2safe.auth import get_earthdata_credentials from burst2safe.utils import BurstInfo @@ -39,7 +38,7 @@ async def download_response_async(response, file_path: Path) -> None: raise ValueError(f'Error downloading, directory not found: {file_path.parent}') with open(file_path, 'wb') as f: - async for chunk in response.content.iter_chunked(8192): + async for chunk in response.content.iter_chunked(2**20): f.write(chunk) @@ -58,21 +57,17 @@ async def download_consumer(queue): response, path = await queue.get() if path is None: break - print(f'Downloading {path}') + print(f'Downloading {path.name}...') await download_response_async(response, path) print('Consumer: Done') -async def download_async(url_dict, token) -> None: +async def download_async(url_dict) -> None: queue = asyncio.Queue() - headers = {'Authorization': f'Bearer {token}'} - async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: - await asyncio.gather(download_producer(url_dict, session, queue), download_consumer(queue)) + async with aiohttp.ClientSession(trust_env=True) as session: + await tqdm.gather(download_producer(url_dict, session, queue), download_consumer(queue)) def download_bursts(burst_infos: Iterable[BurstInfo]): tiffs, xmls = get_url_dict(burst_infos) - username, password = get_earthdata_credentials() - token = os.getenv('EDL_TOKEN') - token = aiohttp.BasicAuth(username, password) - asyncio.run(download_async({**tiffs, **xmls}, token)) + asyncio.run(download_async({**tiffs, **xmls})) From feb2152810cc6853a9b6cafc6ef80b29c9357bc5 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Fri, 11 Oct 2024 14:47:17 -0500 Subject: [PATCH 05/41] little better --- src/burst2safe/download.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index 991da78..f6e3bc3 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -4,7 +4,6 @@ import aiohttp from tenacity import retry, retry_if_result, stop_after_delay, wait_fixed, wait_random -from tqdm.asyncio import tqdm from burst2safe.utils import BurstInfo @@ -38,36 +37,34 @@ async def download_response_async(response, file_path: Path) -> None: raise ValueError(f'Error downloading, directory not found: {file_path.parent}') with open(file_path, 'wb') as f: - async for chunk in response.content.iter_chunked(2**20): + async for chunk in response.content.iter_chunked(2**14): f.write(chunk) async def download_producer(url_dict, session, queue): - print('Producer: Running') for path, url in url_dict.items(): response = await retry_get_response_async(session, url=url) await queue.put((response, path)) await queue.put((None, None)) - print('Producer: Done') async def download_consumer(queue): - print('Consumer: Running') while True: response, path = await queue.get() if path is None: break - print(f'Downloading {path.name}...') await download_response_async(response, path) - print('Consumer: Done') async def download_async(url_dict) -> None: queue = asyncio.Queue() async with aiohttp.ClientSession(trust_env=True) as session: - await tqdm.gather(download_producer(url_dict, session, queue), download_consumer(queue)) + await asyncio.gather(download_producer(url_dict, session, queue), download_consumer(queue)) def download_bursts(burst_infos: Iterable[BurstInfo]): tiffs, xmls = get_url_dict(burst_infos) asyncio.run(download_async({**tiffs, **xmls})) + missing_data = [x for x in {**tiffs, **xmls}.keys() if not x.exists] + if missing_data: + raise ValueError(f'Error downloading, missing files: {", ".join(missing_data)}') From 215c1ca7f8ac8d01395d0f2c417aeba3e6d2e40a Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Mon, 14 Oct 2024 07:16:20 -0500 Subject: [PATCH 06/41] improve names --- src/burst2safe/download.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index f6e3bc3..0339416 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -41,14 +41,14 @@ async def download_response_async(response, file_path: Path) -> None: f.write(chunk) -async def download_producer(url_dict, session, queue): +async def response_producer(url_dict, session, queue): for path, url in url_dict.items(): response = await retry_get_response_async(session, url=url) await queue.put((response, path)) await queue.put((None, None)) -async def download_consumer(queue): +async def response_consumer(queue): while True: response, path = await queue.get() if path is None: @@ -59,7 +59,7 @@ async def download_consumer(queue): async def download_async(url_dict) -> None: queue = asyncio.Queue() async with aiohttp.ClientSession(trust_env=True) as session: - await asyncio.gather(download_producer(url_dict, session, queue), download_consumer(queue)) + await asyncio.gather(response_producer(url_dict, session, queue), response_consumer(queue)) def download_bursts(burst_infos: Iterable[BurstInfo]): @@ -67,4 +67,4 @@ def download_bursts(burst_infos: Iterable[BurstInfo]): asyncio.run(download_async({**tiffs, **xmls})) missing_data = [x for x in {**tiffs, **xmls}.keys() if not x.exists] if missing_data: - raise ValueError(f'Error downloading, missing files: {", ".join(missing_data)}') + raise ValueError(f'Error downloading, missing files: {", ".join(missing_data.name)}') From 6c968de60e0e8052318e0e738f33669c5ceaa0ba Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Mon, 14 Oct 2024 07:26:27 -0500 Subject: [PATCH 07/41] cleanup --- environment.yml | 1 + pyproject.toml | 1 + src/burst2safe/auth.py | 35 +++++++++++++++++++++++++---------- src/burst2safe/download.py | 2 ++ src/burst2safe/search.py | 8 ++++---- 5 files changed, 33 insertions(+), 14 deletions(-) diff --git a/environment.yml b/environment.yml index fa6a571..deb2be0 100644 --- a/environment.yml +++ b/environment.yml @@ -12,6 +12,7 @@ dependencies: - tifffile>=2022.04.22 - asf_search - dateparser!=1.1.0 + - aiohttp # For packaging, and testing - pytest - pytest-cov diff --git a/pyproject.toml b/pyproject.toml index 6507d0d..d49babb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ dependencies = [ "tifffile>=2022.04.22", "asf_search", "dateparser!=1.1.0", + "aiohttp", ] [project.urls] diff --git a/src/burst2safe/auth.py b/src/burst2safe/auth.py index b42b859..41a9778 100644 --- a/src/burst2safe/auth.py +++ b/src/burst2safe/auth.py @@ -57,21 +57,36 @@ def find_creds_in_netrc(service) -> Tuple[str, str]: return None, None -def get_earthdata_credentials() -> Tuple[str, str]: - """Get NASA EarthData credentials from the environment or netrc file. +def write_credentials_to_netrc_file(username: str, password: str) -> None: + """Write credentials to netrc file - Returns: - Tuple of the NASA EarthData username and password + Args: + username: NASA EarthData username + password: NASA EarthData password """ - username, password = find_creds_in_env('EARTHDATA_USERNAME', 'EARTHDATA_PASSWORD') - if username and password: - return username, password + netrc_file = get_netrc() + if not netrc_file.exists(): + netrc_file.touch() + + with open(netrc_file, 'a') as f: + f.write(f'machine {EARTHDATA_HOST} login {username} password {password}\n') + +def check_earthdata_credentials() -> None: + """Check for NASA EarthData credentials in the netrc file or environment variables. + + Will preferentially use the netrc file, and write credentials to the netrc file if found in the environment. + """ username, password = find_creds_in_netrc(EARTHDATA_HOST) if username and password: - return username, password + return + + username, password = find_creds_in_env('EARTHDATA_USERNAME', 'EARTHDATA_PASSWORD') + if username and password: + write_credentials_to_netrc_file(username, password) + return raise ValueError( - 'Please provide NASA EarthData credentials via the ' - 'EARTHDATA_USERNAME and EARTHDATA_PASSWORD environment variables, or your netrc file.' + 'Please provide NASA Earthdata credentials via your .netrc file,' + 'or the EARTHDATA_USERNAME and EARTHDATA_PASSWORD environment variables.' ) diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index 0339416..55bf773 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -6,6 +6,7 @@ from tenacity import retry, retry_if_result, stop_after_delay, wait_fixed, wait_random from burst2safe.utils import BurstInfo +from burst2safe.auth import check_earthdata_credentials def get_url_dict(burst_infos: Iterable[BurstInfo], force: bool = False) -> dict: @@ -64,6 +65,7 @@ async def download_async(url_dict) -> None: def download_bursts(burst_infos: Iterable[BurstInfo]): tiffs, xmls = get_url_dict(burst_infos) + check_earthdata_credentials() asyncio.run(download_async({**tiffs, **xmls})) missing_data = [x for x in {**tiffs, **xmls}.keys() if not x.exists] if missing_data: diff --git a/src/burst2safe/search.py b/src/burst2safe/search.py index 20ac95f..233a6c2 100644 --- a/src/burst2safe/search.py +++ b/src/burst2safe/search.py @@ -13,7 +13,7 @@ from asf_search.Products.S1BurstProduct import S1BurstProduct from shapely.geometry import Polygon -from burst2safe.auth import get_earthdata_credentials +from burst2safe.auth import check_earthdata_credentials from burst2safe.utils import BurstInfo, download_url_with_retries @@ -227,9 +227,9 @@ def download_bursts(burst_infos: Iterable[BurstInfo]) -> None: downloads[burst_info.metadata_path] = burst_info.metadata_url download_info = [(value, key.parent, key.name) for key, value in downloads.items()] urls, dirs, names = zip(*download_info) - - username, password = get_earthdata_credentials() - session = asf_search.ASFSession().auth_with_creds(username, password) + + check_earthdata_credentials() + session = asf_search.ASFSession() n_workers = min(len(urls), max(cpu_count() - 2, 1)) if n_workers == 1: for url, dir, name in zip(urls, dirs, names): From be360e7a66667e1a1593be330b4c4eee6383a499 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Mon, 14 Oct 2024 07:26:44 -0500 Subject: [PATCH 08/41] finish cleanup --- src/burst2safe/download.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index 55bf773..67388d8 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -5,8 +5,8 @@ import aiohttp from tenacity import retry, retry_if_result, stop_after_delay, wait_fixed, wait_random -from burst2safe.utils import BurstInfo from burst2safe.auth import check_earthdata_credentials +from burst2safe.utils import BurstInfo def get_url_dict(burst_infos: Iterable[BurstInfo], force: bool = False) -> dict: From f4927e017bf75ec03fb11e332f6f69f41c12f883 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Mon, 14 Oct 2024 07:34:37 -0500 Subject: [PATCH 09/41] add docstrings --- src/burst2safe/download.py | 58 ++++++++++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 6 deletions(-) diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index 67388d8..bd4c3f6 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -10,6 +10,15 @@ def get_url_dict(burst_infos: Iterable[BurstInfo], force: bool = False) -> dict: + """Get a dictionary of URLs to download. Keys are save paths, and values are download URLs. + + Args: + burst_infos: A list of BurstInfo objects + force: If True, download even if the file already exists + + Returns: + A dictionary of URLs to download + """ tiffs = {} xmls = {} for burst_info in burst_infos: @@ -27,13 +36,28 @@ def get_url_dict(burst_infos: Iterable[BurstInfo], force: bool = False) -> dict: wait=wait_fixed(0.5) + wait_random(0, 1), stop=stop_after_delay(120), ) -async def retry_get_response_async(session, url): +async def retry_get_response_async(session: aiohttp.ClientSession, url: str) -> aiohttp.ClientResponse: + """Retry a GET request until a non-202 response is received. + + Args: + session: An aiohttp ClientSession + url: The URL to GET + + Returns: + An aiohttp ClientResponse + """ response = await session.get(url) response.raise_for_status() return response -async def download_response_async(response, file_path: Path) -> None: +async def download_response_async(response: aiohttp.ClientResponse, file_path: Path) -> None: + """Download the response content to a file. + + Args: + response: An aiohttp ClientResponse + file_path: The path to save the response content to + """ if not file_path.parent.exists(): raise ValueError(f'Error downloading, directory not found: {file_path.parent}') @@ -42,14 +66,26 @@ async def download_response_async(response, file_path: Path) -> None: f.write(chunk) -async def response_producer(url_dict, session, queue): +async def response_producer(url_dict: dict, session: aiohttp.ClientSession, queue: asyncio.Queue) -> None: + """Produce responses to download and put them in a queue. + + Args: + url_dict: A dictionary of URLs to download + session: An aiohttp ClientSession + queue: An asyncio Queue + """ for path, url in url_dict.items(): response = await retry_get_response_async(session, url=url) await queue.put((response, path)) await queue.put((None, None)) -async def response_consumer(queue): +async def response_consumer(queue: asyncio.Queue) -> None: + """Consume responses from a queue and download them. + + Args: + queue: An asyncio Queue + """ while True: response, path = await queue.get() if path is None: @@ -57,13 +93,23 @@ async def response_consumer(queue): await download_response_async(response, path) -async def download_async(url_dict) -> None: +async def download_async(url_dict: dict) -> None: + """Download a dictionary of URLs asynchronously. + + Args: + url_dict: A dictionary of URLs to download + """ queue = asyncio.Queue() async with aiohttp.ClientSession(trust_env=True) as session: await asyncio.gather(response_producer(url_dict, session, queue), response_consumer(queue)) -def download_bursts(burst_infos: Iterable[BurstInfo]): +def download_bursts(burst_infos: Iterable[BurstInfo]) -> None: + """Download the burst data and metadata files using an async queue. + + Args: + burst_infos: A list of BurstInfo objects + """ tiffs, xmls = get_url_dict(burst_infos) check_earthdata_credentials() asyncio.run(download_async({**tiffs, **xmls})) From 5a2238afbca923d03385cf5319127bb45009fcc7 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Mon, 14 Oct 2024 08:34:27 -0500 Subject: [PATCH 10/41] start work on search --- src/burst2safe/search.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/burst2safe/search.py b/src/burst2safe/search.py index 233a6c2..2c64241 100644 --- a/src/burst2safe/search.py +++ b/src/burst2safe/search.py @@ -137,6 +137,7 @@ def find_group( swaths: Optional[Iterable] = None, mode: str = 'IW', min_bursts: int = 1, + orbit_is_relative=False, ) -> List[S1BurstProduct]: """Find burst groups using ASF Search. @@ -147,6 +148,7 @@ def find_group( swaths: List of swaths to include (default: all) mode: The collection mode to use (IW or EW) (default: IW) min_bursts: The minimum number of bursts per swath (default: 1) + orbit_is_relative: Whether the orbit number is relative or absolute (default: False) Returns: A list of S1BurstProduct objects @@ -171,10 +173,9 @@ def find_group( if bad_swaths: raise ValueError(f'Invalid swaths: {" ".join(bad_swaths)}') - dataset = asf_search.constants.DATASET.SLC_BURST - search_results = asf_search.geo_search( - dataset=dataset, absoluteOrbit=orbit, intersectsWith=footprint.wkt, beamMode=mode - ) + opts = {'dataset': asf_search.constants.DATASET.SLC_BURST, 'intersectsWith': footprint.wkt, 'beamMode': mode} + opts['relativeOrbit' if orbit_is_relative else 'absoluteOrbit'] = orbit + search_results = asf_search.geo_search(**opts) final_results = [] for pol, swath in product(polarizations, swaths): sub_results = find_swath_pol_group(search_results, pol, swath, min_bursts) @@ -227,7 +228,7 @@ def download_bursts(burst_infos: Iterable[BurstInfo]) -> None: downloads[burst_info.metadata_path] = burst_info.metadata_url download_info = [(value, key.parent, key.name) for key, value in downloads.items()] urls, dirs, names = zip(*download_info) - + check_earthdata_credentials() session = asf_search.ASFSession() n_workers = min(len(urls), max(cpu_count() - 2, 1)) From 807ab68b315771670bd04330f60d6f971f89c1e8 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Mon, 14 Oct 2024 09:46:35 -0500 Subject: [PATCH 11/41] undo search changes --- src/burst2safe/search.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/burst2safe/search.py b/src/burst2safe/search.py index 2c64241..c711db0 100644 --- a/src/burst2safe/search.py +++ b/src/burst2safe/search.py @@ -137,7 +137,6 @@ def find_group( swaths: Optional[Iterable] = None, mode: str = 'IW', min_bursts: int = 1, - orbit_is_relative=False, ) -> List[S1BurstProduct]: """Find burst groups using ASF Search. @@ -148,7 +147,6 @@ def find_group( swaths: List of swaths to include (default: all) mode: The collection mode to use (IW or EW) (default: IW) min_bursts: The minimum number of bursts per swath (default: 1) - orbit_is_relative: Whether the orbit number is relative or absolute (default: False) Returns: A list of S1BurstProduct objects @@ -173,9 +171,10 @@ def find_group( if bad_swaths: raise ValueError(f'Invalid swaths: {" ".join(bad_swaths)}') - opts = {'dataset': asf_search.constants.DATASET.SLC_BURST, 'intersectsWith': footprint.wkt, 'beamMode': mode} - opts['relativeOrbit' if orbit_is_relative else 'absoluteOrbit'] = orbit - search_results = asf_search.geo_search(**opts) + dataset = asf_search.constants.DATASET.SLC_BURST + search_results = asf_search.geo_search( + dataset=dataset, absoluteOrbit=orbit, intersectsWith=footprint.wkt, beamMode=mode + ) final_results = [] for pol, swath in product(polarizations, swaths): sub_results = find_swath_pol_group(search_results, pol, swath, min_bursts) From ead30ddc89c13dd89a4d70df3de4b0439d33f62d Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Tue, 15 Oct 2024 08:05:58 -0500 Subject: [PATCH 12/41] workking async thread download --- src/burst2safe/burst2stack.py | 66 ++++++++++++++++++++++++----------- src/burst2safe/download.py | 46 ++++++++++++++++++------ src/burst2safe/search.py | 59 ++++++++++++++++++++++--------- 3 files changed, 124 insertions(+), 47 deletions(-) diff --git a/src/burst2safe/burst2stack.py b/src/burst2safe/burst2stack.py index 64757e8..1d79ea2 100644 --- a/src/burst2safe/burst2stack.py +++ b/src/burst2safe/burst2stack.py @@ -3,13 +3,14 @@ from argparse import ArgumentParser from datetime import datetime from pathlib import Path -from typing import Iterable, Optional +from typing import Iterable, List, Optional from shapely.geometry import Polygon from burst2safe import utils -from burst2safe.burst2safe import burst2safe -from burst2safe.search import find_stack_orbits +from burst2safe.download import download_bursts +from burst2safe.safe import Safe +from burst2safe.search import find_group DESCRIPTION = """Convert a stack of ASF burst SLCs to a stack of ESA SAFEs. @@ -31,7 +32,8 @@ def burst2stack( all_anns: bool = False, keep_files: bool = False, work_dir: Optional[Path] = None, -) -> Path: + parallel: bool = False, +) -> List[Path]: """Convert a stack of burst granules to a stack of ESA SAFEs. Wraps the burst2safe function to handle multiple dates. @@ -48,22 +50,44 @@ def burst2stack( keep_files: Keep the intermediate files work_dir: The directory to create the SAFE in (default: current directory) """ - absolute_orbits = find_stack_orbits(rel_orbit, extent, start_date, end_date) - print(f'Creating SAFEs for {len(absolute_orbits)} time periods...') - for orbit in absolute_orbits: - print() - burst2safe( - granules=None, - orbit=orbit, - extent=extent, - polarizations=polarizations, - swaths=swaths, - mode=mode, - min_bursts=min_bursts, - all_anns=all_anns, - keep_files=keep_files, - work_dir=work_dir, - ) + burst_search_results = find_group( + rel_orbit, + extent, + polarizations, + swaths, + mode, + min_bursts, + use_relative_orbit=True, + start_date=start_date, + end_date=end_date, + ) + burst_infos = utils.get_burst_infos(burst_search_results, work_dir) + abs_orbits = utils.drop_duplicates([burst_info.absolute_orbit for burst_info in burst_infos]) + print(f'Found {len(burst_infos)} burst(s), comprising {len(abs_orbits)} SAFE(s).') + + print('Check burst group validities...') + burst_sets = [[bi for bi in burst_infos if bi.absolute_orbit == orbit] for orbit in abs_orbits] + # Checking burst group validities before download to fail faster + for burst_infos in burst_sets: + Safe.check_group_validity(burst_infos) + + print('Downloading data...') + download_bursts(burst_infos, parallel=parallel) + [info.add_shape_info() for info in burst_infos] + [info.add_start_stop_utc() for info in burst_infos] + print('Download complete.') + + print('Creating SAFEs...') + safe_paths = [] + for burst_infos in burst_sets: + safe = Safe(burst_infos, all_anns, work_dir) + safe_path = safe.create_safe() + safe_paths.append(safe_path) + if not keep_files: + safe.cleanup() + print('SAFEs creaated!') + + return safe_paths def main() -> None: @@ -89,6 +113,7 @@ def main() -> None: ) parser.add_argument('--keep-files', action='store_true', default=False, help='Keep the intermediate files') parser.add_argument('--output-dir', type=str, default=None, help='Output directory to save to') + parser.add_argument('--parallel', action='store_true', default=False, help='Download bursts in parallel') args = utils.reparse_args(parser.parse_args(), tool='burst2stack') @@ -104,4 +129,5 @@ def main() -> None: all_anns=args.all_anns, keep_files=args.keep_files, work_dir=args.output_dir, + parallel=args.parallel, ) diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index bd4c3f6..4344b65 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -1,9 +1,12 @@ import asyncio +import os +from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Iterable import aiohttp -from tenacity import retry, retry_if_result, stop_after_delay, wait_fixed, wait_random +import numpy as np +from tenacity import retry, retry_if_result, stop_after_attempt, stop_after_delay, wait_fixed, wait_random from burst2safe.auth import check_earthdata_credentials from burst2safe.utils import BurstInfo @@ -51,6 +54,7 @@ async def retry_get_response_async(session: aiohttp.ClientSession, url: str) -> return response +@retry(wait=wait_fixed(0.5), stop=stop_after_attempt(3)) async def download_response_async(response: aiohttp.ClientResponse, file_path: Path) -> None: """Download the response content to a file. @@ -58,12 +62,16 @@ async def download_response_async(response: aiohttp.ClientResponse, file_path: P response: An aiohttp ClientResponse file_path: The path to save the response content to """ - if not file_path.parent.exists(): - raise ValueError(f'Error downloading, directory not found: {file_path.parent}') - - with open(file_path, 'wb') as f: - async for chunk in response.content.iter_chunked(2**14): - f.write(chunk) + try: + with open(file_path, 'wb') as f: + async for chunk in response.content.iter_chunked(2**14): + f.write(chunk) + response.close() + except Exception as e: + response.close() + if file_path.exists(): + file_path.unlink() + raise e async def response_producer(url_dict: dict, session: aiohttp.ClientSession, queue: asyncio.Queue) -> None: @@ -104,7 +112,7 @@ async def download_async(url_dict: dict) -> None: await asyncio.gather(response_producer(url_dict, session, queue), response_consumer(queue)) -def download_bursts(burst_infos: Iterable[BurstInfo]) -> None: +def download_bursts_async(burst_infos: Iterable[BurstInfo]) -> None: """Download the burst data and metadata files using an async queue. Args: @@ -112,7 +120,25 @@ def download_bursts(burst_infos: Iterable[BurstInfo]) -> None: """ tiffs, xmls = get_url_dict(burst_infos) check_earthdata_credentials() - asyncio.run(download_async({**tiffs, **xmls})) - missing_data = [x for x in {**tiffs, **xmls}.keys() if not x.exists] + asyncio.run(download_async({**xmls, **tiffs})) + + +def download_bursts(burst_infos: Iterable[BurstInfo], parallel: bool = False) -> None: + """Download the burst data and metadata files using multiple workers. + + Args: + burst_infos: A list of BurstInfo objects + """ + check_earthdata_credentials() + if parallel: + max_threads = min(len(burst_infos), os.cpu_count() + 2) + burst_info_sets = [list(x) for x in np.array_split(burst_infos, max_threads)] + with ThreadPoolExecutor(max_threads) as executor: + executor.map(download_bursts_async, burst_info_sets) + else: + download_bursts_async(burst_infos) + + tiffs, xmls = get_url_dict(burst_infos, force=True) + missing_data = [x for x in {**xmls, **tiffs}.keys() if not x.exists] if missing_data: raise ValueError(f'Error downloading, missing files: {", ".join(missing_data.name)}') diff --git a/src/burst2safe/search.py b/src/burst2safe/search.py index c711db0..7018f68 100644 --- a/src/burst2safe/search.py +++ b/src/burst2safe/search.py @@ -38,7 +38,7 @@ def find_granules(granules: Iterable[str]) -> List[S1BurstProduct]: return list(results) -def find_stack_orbits(rel_orbit: int, extent: Polygon, start_date: datetime, end_date: datetime) -> List[int]: +def find_stack_data(rel_orbit: int, extent: Polygon, start_date: datetime, end_date: datetime) -> List[int]: """Find all orbits in a stack using ASF Search. Args: @@ -54,11 +54,11 @@ def find_stack_orbits(rel_orbit: int, extent: Polygon, start_date: datetime, end dataset=dataset, relativeOrbit=rel_orbit, intersectsWith=extent.centroid.wkt, - start=start_date.strftime('%Y-%m-%d'), - end=end_date.strftime('%Y-%m-%d'), + start=f'{start_date.strftime('%Y-%m-%d')}T00:00:00Z', + end=f'{end_date.strftime('%Y-%m-%d')}T23:59:59Z', ) absolute_orbits = list(set([int(result.properties['orbit']) for result in search_results])) - return absolute_orbits + return absolute_orbits, search_results def add_surrounding_bursts(bursts: List[S1BurstProduct], min_bursts: int) -> List[S1BurstProduct]: @@ -94,28 +94,35 @@ def add_surrounding_bursts(bursts: List[S1BurstProduct], min_bursts: int) -> Lis return search_results -def find_swath_pol_group( - search_results: List[S1BurstProduct], pol: str, swath: Optional[str], min_bursts: int +def get_burst_group( + search_results: List[S1BurstProduct], + pol: str, + swath: Optional[str] = None, + orbit: Optional[int] = None, + min_bursts: int = 0, ) -> List[S1BurstProduct]: - """Find a group of bursts with the same polarization and swath. + """Find a group of bursts with the same polarization, swath and optionally orbit. Add surrounding bursts if the group is too small. Args: search_results: A list of S1BurstProduct objects pol: The polarization to search for swath: The swath to search for + orbit: The absolute orbit number of the bursts min_bursts: The minimum number of bursts per swath Returns: An updated list of S1BurstProduct objects """ + params = [] + if orbit: + search_results = [result for result in search_results if result.properties['orbit'] == orbit] + params.append(f'orbit {orbit}') if swath: search_results = [result for result in search_results if result.properties['burst']['subswath'] == swath] - search_results = [result for result in search_results if result.properties['polarization'] == pol] - - params = [f'polarization {pol}'] - if swath: params.append(f'swath {swath}') + search_results = [result for result in search_results if result.properties['polarization'] == pol] + params.append(f'polarization {pol}') params = ', '.join(params) if not search_results: @@ -137,6 +144,9 @@ def find_group( swaths: Optional[Iterable] = None, mode: str = 'IW', min_bursts: int = 1, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + use_relative_orbit: bool = False, ) -> List[S1BurstProduct]: """Find burst groups using ASF Search. @@ -147,6 +157,7 @@ def find_group( swaths: List of swaths to include (default: all) mode: The collection mode to use (IW or EW) (default: IW) min_bursts: The minimum number of bursts per swath (default: 1) + use_relative_orbit: Use relative orbit number instead of absolute orbit number (default: False) Returns: A list of S1BurstProduct objects @@ -171,13 +182,27 @@ def find_group( if bad_swaths: raise ValueError(f'Invalid swaths: {" ".join(bad_swaths)}') - dataset = asf_search.constants.DATASET.SLC_BURST - search_results = asf_search.geo_search( - dataset=dataset, absoluteOrbit=orbit, intersectsWith=footprint.wkt, beamMode=mode - ) + if use_relative_orbit and not (start_date and end_date): + raise ValueError('You must provide start and end dates when using relative orbit number.') + + opts = dict(dataset=asf_search.constants.DATASET.SLC_BURST, intersectsWith=footprint.wkt, beamMode=mode) + if use_relative_orbit: + opts['relativeOrbit'] = orbit + opts['start'] = (f'{start_date.strftime('%Y-%m-%d')}T00:00:00Z',) + opts['end'] = (f'{end_date.strftime('%Y-%m-%d')}T23:59:59Z',) + else: + opts['absoluteOrbit'] = orbit + search_results = asf_search.geo_search(**opts) + final_results = [] - for pol, swath in product(polarizations, swaths): - sub_results = find_swath_pol_group(search_results, pol, swath, min_bursts) + if use_relative_orbit: + absolute_orbits = list(set([int(result.properties['orbit']) for result in search_results])) + group_definitions = product(polarizations, swaths, absolute_orbits) + else: + group_definitions = product(polarizations, swaths) + + for group_definition in group_definitions: + sub_results = get_burst_group(search_results, *group_definition, min_bursts=min_bursts) final_results.extend(sub_results) return final_results From b4d7a43f08dd88b3ef356c4e6d4dfc33536dd49a Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Fri, 15 Nov 2024 14:56:42 -0600 Subject: [PATCH 13/41] remove multithreading --- src/burst2safe/download.py | 24 ++---------------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index 4344b65..85e12d1 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -1,11 +1,8 @@ import asyncio -import os -from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Iterable import aiohttp -import numpy as np from tenacity import retry, retry_if_result, stop_after_attempt, stop_after_delay, wait_fixed, wait_random from burst2safe.auth import check_earthdata_credentials @@ -112,32 +109,15 @@ async def download_async(url_dict: dict) -> None: await asyncio.gather(response_producer(url_dict, session, queue), response_consumer(queue)) -def download_bursts_async(burst_infos: Iterable[BurstInfo]) -> None: +def download_bursts(burst_infos: Iterable[BurstInfo]) -> None: """Download the burst data and metadata files using an async queue. Args: burst_infos: A list of BurstInfo objects """ - tiffs, xmls = get_url_dict(burst_infos) check_earthdata_credentials() + tiffs, xmls = get_url_dict(burst_infos) asyncio.run(download_async({**xmls, **tiffs})) - - -def download_bursts(burst_infos: Iterable[BurstInfo], parallel: bool = False) -> None: - """Download the burst data and metadata files using multiple workers. - - Args: - burst_infos: A list of BurstInfo objects - """ - check_earthdata_credentials() - if parallel: - max_threads = min(len(burst_infos), os.cpu_count() + 2) - burst_info_sets = [list(x) for x in np.array_split(burst_infos, max_threads)] - with ThreadPoolExecutor(max_threads) as executor: - executor.map(download_bursts_async, burst_info_sets) - else: - download_bursts_async(burst_infos) - tiffs, xmls = get_url_dict(burst_infos, force=True) missing_data = [x for x in {**xmls, **tiffs}.keys() if not x.exists] if missing_data: From c09cc3c9c728add63dcf66178072ec6ba010fa65 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Fri, 15 Nov 2024 15:27:22 -0600 Subject: [PATCH 14/41] remove parallel arg --- src/burst2safe/burst2stack.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/burst2safe/burst2stack.py b/src/burst2safe/burst2stack.py index 1d79ea2..69c8cc2 100644 --- a/src/burst2safe/burst2stack.py +++ b/src/burst2safe/burst2stack.py @@ -32,7 +32,6 @@ def burst2stack( all_anns: bool = False, keep_files: bool = False, work_dir: Optional[Path] = None, - parallel: bool = False, ) -> List[Path]: """Convert a stack of burst granules to a stack of ESA SAFEs. Wraps the burst2safe function to handle multiple dates. @@ -72,7 +71,7 @@ def burst2stack( Safe.check_group_validity(burst_infos) print('Downloading data...') - download_bursts(burst_infos, parallel=parallel) + download_bursts(burst_infos) [info.add_shape_info() for info in burst_infos] [info.add_start_stop_utc() for info in burst_infos] print('Download complete.') @@ -113,7 +112,6 @@ def main() -> None: ) parser.add_argument('--keep-files', action='store_true', default=False, help='Keep the intermediate files') parser.add_argument('--output-dir', type=str, default=None, help='Output directory to save to') - parser.add_argument('--parallel', action='store_true', default=False, help='Download bursts in parallel') args = utils.reparse_args(parser.parse_args(), tool='burst2stack') @@ -129,5 +127,4 @@ def main() -> None: all_anns=args.all_anns, keep_files=args.keep_files, work_dir=args.output_dir, - parallel=args.parallel, ) From 0823e192b8e0b8ff242196cbad55bf593783deed Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Mon, 18 Nov 2024 07:11:14 -0600 Subject: [PATCH 15/41] fix bug in burst2stack --- src/burst2safe/burst2stack.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/burst2safe/burst2stack.py b/src/burst2safe/burst2stack.py index 69c8cc2..8bed1ce 100644 --- a/src/burst2safe/burst2stack.py +++ b/src/burst2safe/burst2stack.py @@ -67,19 +67,19 @@ def burst2stack( print('Check burst group validities...') burst_sets = [[bi for bi in burst_infos if bi.absolute_orbit == orbit] for orbit in abs_orbits] # Checking burst group validities before download to fail faster - for burst_infos in burst_sets: - Safe.check_group_validity(burst_infos) + for burst_set in burst_sets: + Safe.check_group_validity(burst_set) print('Downloading data...') download_bursts(burst_infos) - [info.add_shape_info() for info in burst_infos] - [info.add_start_stop_utc() for info in burst_infos] print('Download complete.') print('Creating SAFEs...') safe_paths = [] - for burst_infos in burst_sets: - safe = Safe(burst_infos, all_anns, work_dir) + for burst_set in burst_sets: + [info.add_shape_info() for info in burst_set] + [info.add_start_stop_utc() for info in burst_set] + safe = Safe(burst_set, all_anns, work_dir) safe_path = safe.create_safe() safe_paths.append(safe_path) if not keep_files: From f4917fb40b1fbd6e99827af563260011747b8135 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Mon, 18 Nov 2024 07:50:07 -0600 Subject: [PATCH 16/41] turn off forcing --- src/burst2safe/download.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index 85e12d1..df64a9d 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -118,7 +118,7 @@ def download_bursts(burst_infos: Iterable[BurstInfo]) -> None: check_earthdata_credentials() tiffs, xmls = get_url_dict(burst_infos) asyncio.run(download_async({**xmls, **tiffs})) - tiffs, xmls = get_url_dict(burst_infos, force=True) + tiffs, xmls = get_url_dict(burst_infos, force=False) missing_data = [x for x in {**xmls, **tiffs}.keys() if not x.exists] if missing_data: raise ValueError(f'Error downloading, missing files: {", ".join(missing_data.name)}') From 7e02c75ba539281f1d29beb37be32f72b15fdeb3 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Tue, 19 Nov 2024 07:38:15 -0600 Subject: [PATCH 17/41] refactor and add filetype check --- src/burst2safe/download.py | 80 +++++++++++++------------------------- 1 file changed, 27 insertions(+), 53 deletions(-) diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index df64a9d..de818e7 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -3,7 +3,7 @@ from typing import Iterable import aiohttp -from tenacity import retry, retry_if_result, stop_after_attempt, stop_after_delay, wait_fixed, wait_random +from tenacity import retry, retry_if_result, stop_after_attempt, stop_after_delay, wait_random from burst2safe.auth import check_earthdata_credentials from burst2safe.utils import BurstInfo @@ -19,46 +19,45 @@ def get_url_dict(burst_infos: Iterable[BurstInfo], force: bool = False) -> dict: Returns: A dictionary of URLs to download """ - tiffs = {} - xmls = {} + url_dict = {} for burst_info in burst_infos: if force or not burst_info.data_path.exists(): - tiffs[burst_info.data_path] = burst_info.data_url - + url_dict[burst_info.data_path] = burst_info.data_url if force or not burst_info.metadata_path.exists(): - xmls[burst_info.metadata_path] = burst_info.metadata_url - return tiffs, xmls + url_dict[burst_info.metadata_path] = burst_info.metadata_url + return url_dict @retry( - reraise=True, - retry=retry_if_result(lambda r: r.status == 202), - wait=wait_fixed(0.5) + wait_random(0, 1), - stop=stop_after_delay(120), + reraise=True, retry=retry_if_result(lambda r: r.status == 202), wait=wait_random(0, 1), stop=stop_after_delay(120) ) -async def retry_get_response_async(session: aiohttp.ClientSession, url: str) -> aiohttp.ClientResponse: - """Retry a GET request until a non-202 response is received. +async def get_async(session: aiohttp.ClientSession, url: str) -> aiohttp.ClientResponse: + """Retry a GET request until a non-202 response is received Args: session: An aiohttp ClientSession - url: The URL to GET + url: The URL to download Returns: - An aiohttp ClientResponse + The response object """ response = await session.get(url) response.raise_for_status() return response -@retry(wait=wait_fixed(0.5), stop=stop_after_attempt(3)) -async def download_response_async(response: aiohttp.ClientResponse, file_path: Path) -> None: - """Download the response content to a file. +@retry(reraise=True, stop=stop_after_attempt(5)) +async def download_url_async(session: aiohttp.ClientSession, url: str, file_path: Path) -> None: + """Retry a GET request until a non-202 response is received, then download data. Args: - response: An aiohttp ClientResponse - file_path: The path to save the response content to + session: An aiohttp ClientSession + url: The URL to download + file_path: The path to save the downloaded data to """ + response = await get_async(session, url) + assert response.status == 200 + assert Path(response.content_disposition.filename).suffix == file_path.suffix try: with open(file_path, 'wb') as f: async for chunk in response.content.iter_chunked(2**14): @@ -71,42 +70,17 @@ async def download_response_async(response: aiohttp.ClientResponse, file_path: P raise e -async def response_producer(url_dict: dict, session: aiohttp.ClientSession, queue: asyncio.Queue) -> None: - """Produce responses to download and put them in a queue. - - Args: - url_dict: A dictionary of URLs to download - session: An aiohttp ClientSession - queue: An asyncio Queue - """ - for path, url in url_dict.items(): - response = await retry_get_response_async(session, url=url) - await queue.put((response, path)) - await queue.put((None, None)) - - -async def response_consumer(queue: asyncio.Queue) -> None: - """Consume responses from a queue and download them. - - Args: - queue: An asyncio Queue - """ - while True: - response, path = await queue.get() - if path is None: - break - await download_response_async(response, path) - - async def download_async(url_dict: dict) -> None: """Download a dictionary of URLs asynchronously. Args: url_dict: A dictionary of URLs to download """ - queue = asyncio.Queue() async with aiohttp.ClientSession(trust_env=True) as session: - await asyncio.gather(response_producer(url_dict, session, queue), response_consumer(queue)) + tasks = [] + for file_path, url in url_dict.items(): + tasks.append(download_url_async(session, url, file_path)) + await asyncio.gather(*tasks) def download_bursts(burst_infos: Iterable[BurstInfo]) -> None: @@ -116,9 +90,9 @@ def download_bursts(burst_infos: Iterable[BurstInfo]) -> None: burst_infos: A list of BurstInfo objects """ check_earthdata_credentials() - tiffs, xmls = get_url_dict(burst_infos) - asyncio.run(download_async({**xmls, **tiffs})) - tiffs, xmls = get_url_dict(burst_infos, force=False) - missing_data = [x for x in {**xmls, **tiffs}.keys() if not x.exists] + url_dict = get_url_dict(burst_infos) + asyncio.run(download_async(url_dict)) + full_dict = get_url_dict(burst_infos, force=True) + missing_data = [x for x in full_dict.keys() if not x.exists] if missing_data: raise ValueError(f'Error downloading, missing files: {", ".join(missing_data.name)}') From ad78957abeb36740c50df75ee532bf9015ae5c13 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Tue, 19 Nov 2024 08:38:41 -0600 Subject: [PATCH 18/41] account for race condition --- src/burst2safe/download.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index de818e7..251bae3 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -46,8 +46,8 @@ async def get_async(session: aiohttp.ClientSession, url: str) -> aiohttp.ClientR return response -@retry(reraise=True, stop=stop_after_attempt(5)) -async def download_url_async(session: aiohttp.ClientSession, url: str, file_path: Path) -> None: +@retry(reraise=True, stop=stop_after_attempt(3)) +async def download_burst_url_async(session: aiohttp.ClientSession, url: str, file_path: Path) -> None: """Retry a GET request until a non-202 response is received, then download data. Args: @@ -57,7 +57,18 @@ async def download_url_async(session: aiohttp.ClientSession, url: str, file_path """ response = await get_async(session, url) assert response.status == 200 - assert Path(response.content_disposition.filename).suffix == file_path.suffix + + if file_path.suffix in ['.tif', '.tiff']: + returned_filename = response.content_disposition.filename + elif file_path.suffix == '.xml': + url_parts = str(response.url).split('/') + ext = response.content_disposition.filename.split('.')[-1] + returned_filename = f'{url_parts[3]}_{url_parts[5]}.{ext}' + else: + raise ValueError(f'Invalid file extension: {file_path.suffix}') + if file_path.name != returned_filename: + raise ValueError(f'Race condition encountered, incorrect url returned for file: {file_path.name}') + try: with open(file_path, 'wb') as f: async for chunk in response.content.iter_chunked(2**14): @@ -70,7 +81,7 @@ async def download_url_async(session: aiohttp.ClientSession, url: str, file_path raise e -async def download_async(url_dict: dict) -> None: +async def download_bursts_async(url_dict: dict) -> None: """Download a dictionary of URLs asynchronously. Args: @@ -79,7 +90,7 @@ async def download_async(url_dict: dict) -> None: async with aiohttp.ClientSession(trust_env=True) as session: tasks = [] for file_path, url in url_dict.items(): - tasks.append(download_url_async(session, url, file_path)) + tasks.append(download_burst_url_async(session, url, file_path)) await asyncio.gather(*tasks) @@ -91,7 +102,7 @@ def download_bursts(burst_infos: Iterable[BurstInfo]) -> None: """ check_earthdata_credentials() url_dict = get_url_dict(burst_infos) - asyncio.run(download_async(url_dict)) + asyncio.run(download_bursts_async(url_dict)) full_dict = get_url_dict(burst_infos, force=True) missing_data = [x for x in full_dict.keys() if not x.exists] if missing_data: From 9f6a36acee30541db482745818e2c0b565ca16a9 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Tue, 19 Nov 2024 14:55:01 -0600 Subject: [PATCH 19/41] add support for edl tokens --- src/burst2safe/auth.py | 11 ++++++++--- src/burst2safe/download.py | 24 +++++++++++++++++++----- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/src/burst2safe/auth.py b/src/burst2safe/auth.py index 41a9778..d0b23b2 100644 --- a/src/burst2safe/auth.py +++ b/src/burst2safe/auth.py @@ -6,6 +6,7 @@ EARTHDATA_HOST = 'urs.earthdata.nasa.gov' +EARTHDATA_TOKEN_VAR = 'EDL_TOKEN' def get_netrc() -> Path: @@ -79,14 +80,18 @@ def check_earthdata_credentials() -> None: """ username, password = find_creds_in_netrc(EARTHDATA_HOST) if username and password: - return + return 'netrc' username, password = find_creds_in_env('EARTHDATA_USERNAME', 'EARTHDATA_PASSWORD') if username and password: write_credentials_to_netrc_file(username, password) - return + return 'netrc' + + if os.getenv(EARTHDATA_TOKEN_VAR): + return 'token' raise ValueError( 'Please provide NASA Earthdata credentials via your .netrc file,' - 'or the EARTHDATA_USERNAME and EARTHDATA_PASSWORD environment variables.' + 'the EARTHDATA_USERNAME and EARTHDATA_PASSWORD environment variables,' + 'or an EDL Token via the EDL_TOKEN environment variable.' ) diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index 251bae3..674460b 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -1,4 +1,5 @@ import asyncio +import os from pathlib import Path from typing import Iterable @@ -31,18 +32,26 @@ def get_url_dict(burst_infos: Iterable[BurstInfo], force: bool = False) -> dict: @retry( reraise=True, retry=retry_if_result(lambda r: r.status == 202), wait=wait_random(0, 1), stop=stop_after_delay(120) ) -async def get_async(session: aiohttp.ClientSession, url: str) -> aiohttp.ClientResponse: +async def get_async(session: aiohttp.ClientSession, url: str, max_redirects: int = 5) -> aiohttp.ClientResponse: """Retry a GET request until a non-202 response is received Args: session: An aiohttp ClientSession url: The URL to download + max_redirects: The maximum number of redirects to follow Returns: The response object """ - response = await session.get(url) - response.raise_for_status() + for i in range(max_redirects): + response = await session.get(url, allow_redirects=False) + response.raise_for_status() + if 300 <= response.status < 400: + url = response.headers['Location'] + elif 200 <= response.status < 300: + break + elif i == max_redirects - 1: + raise Exception(f'Maximum number of redirects reached: {max_redirects}') return response @@ -87,7 +96,13 @@ async def download_bursts_async(url_dict: dict) -> None: Args: url_dict: A dictionary of URLs to download """ - async with aiohttp.ClientSession(trust_env=True) as session: + auth_type = check_earthdata_credentials() + if auth_type == 'token': + token = os.getenv('EDL_TOKEN') + headers = {aiohttp.hdrs.AUTHORIZATION: f'Bearer {token}'} + else: + headers = {} + async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: tasks = [] for file_path, url in url_dict.items(): tasks.append(download_burst_url_async(session, url, file_path)) @@ -100,7 +115,6 @@ def download_bursts(burst_infos: Iterable[BurstInfo]) -> None: Args: burst_infos: A list of BurstInfo objects """ - check_earthdata_credentials() url_dict = get_url_dict(burst_infos) asyncio.run(download_bursts_async(url_dict)) full_dict = get_url_dict(burst_infos, force=True) From 1b03538fa81f15d0f584cc67739d0141f78b11b4 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Thu, 19 Dec 2024 14:03:33 -0600 Subject: [PATCH 20/41] minor old changes --- src/burst2safe/aiohttp_b2s.py | 314 ++++++++++++++++++++++++++++++++ src/burst2safe/token_example.py | 89 +++++++++ 2 files changed, 403 insertions(+) create mode 100644 src/burst2safe/aiohttp_b2s.py create mode 100644 src/burst2safe/token_example.py diff --git a/src/burst2safe/aiohttp_b2s.py b/src/burst2safe/aiohttp_b2s.py new file mode 100644 index 0000000..d8061a8 --- /dev/null +++ b/src/burst2safe/aiohttp_b2s.py @@ -0,0 +1,314 @@ +import asyncio +import warnings +from types import SimpleNamespace +from typing import Any, Iterable, Mapping, Optional, Union + +from aiohttp import ClientSession, hdrs, payload +from aiohttp.client import ClientTimeout +from aiohttp.client_exceptions import ClientError, ClientOSError, InvalidURL, ServerTimeoutError, TooManyRedirects +from aiohttp.client_reqrep import ClientResponse, Fingerprint, _merge_ssl_params +from aiohttp.cookiejar import CookieJar +from aiohttp.helpers import ( + BasicAuth, + # CeilTimeout, + TimeoutHandle, + proxies_from_env, + sentinel, + strip_auth_from_url, +) +from aiohttp.tracing import Trace +from aiohttp.typedefs import LooseCookies, LooseHeaders, StrOrURL +from multidict import istr +from yarl import URL + + +try: + from ssl import SSLContext +except ImportError: # pragma: no cover + SSLContext = object # type: ignore + + +class TrustingSession(ClientSession): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + async def _request( + self, + method: str, + str_or_url: StrOrURL, + *, + params: Optional[Mapping[str, str]] = None, + data: Any = None, + json: Any = None, + cookies: Optional[LooseCookies] = None, + headers: Optional[LooseHeaders] = None, + skip_auto_headers: Optional[Iterable[str]] = None, + auth: Optional[BasicAuth] = None, + allow_redirects: bool = True, + max_redirects: int = 10, + compress: Optional[str] = None, + chunked: Optional[bool] = None, + expect100: bool = False, + raise_for_status: Optional[bool] = None, + read_until_eof: bool = True, + proxy: Optional[StrOrURL] = None, + proxy_auth: Optional[BasicAuth] = None, + timeout: Union[ClientTimeout, object] = sentinel, + verify_ssl: Optional[bool] = None, + fingerprint: Optional[bytes] = None, + ssl_context: Optional[SSLContext] = None, + ssl: Optional[Union[SSLContext, bool, Fingerprint]] = None, + proxy_headers: Optional[LooseHeaders] = None, + trace_request_ctx: Optional[SimpleNamespace] = None, + read_bufsize: Optional[int] = None, + ) -> ClientResponse: + # NOTE: timeout clamps existing connect and read timeouts. We cannot + # set the default to None because we need to detect if the user wants + # to use the existing timeouts by setting timeout to None. + + if self.closed: + raise RuntimeError('Session is closed') + + ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) + + if data is not None and json is not None: + raise ValueError('data and json parameters can not be used at the same time') + elif json is not None: + data = payload.JsonPayload(json, dumps=self._json_serialize) + + if not isinstance(chunked, bool) and chunked is not None: + warnings.warn('Chunk size is deprecated #1615', DeprecationWarning) + + redirects = 0 + history = [] + version = self._version + + # Merge with default headers and transform to CIMultiDict + headers = self._prepare_headers(headers) + proxy_headers = self._prepare_headers(proxy_headers) + + try: + url = URL(str_or_url) + except ValueError as e: + raise InvalidURL(str_or_url) from e + + skip_headers = set(self._skip_auto_headers) + if skip_auto_headers is not None: + for i in skip_auto_headers: + skip_headers.add(istr(i)) + + if proxy is not None: + try: + proxy = URL(proxy) + except ValueError as e: + raise InvalidURL(proxy) from e + + if timeout is sentinel: + real_timeout = self._timeout # type: ClientTimeout + else: + if not isinstance(timeout, ClientTimeout): + real_timeout = ClientTimeout(total=timeout) # type: ignore + else: + real_timeout = timeout + # timeout is cumulative for all request operations + # (request, redirects, responses, data consuming) + tm = TimeoutHandle(self._loop, real_timeout.total) + handle = tm.start() + + if read_bufsize is None: + read_bufsize = self._read_bufsize + + traces = [ + Trace( + self, + trace_config, + trace_config.trace_config_ctx(trace_request_ctx=trace_request_ctx), + ) + for trace_config in self._trace_configs + ] + + for trace in traces: + await trace.send_request_start(method, url, headers) + + timer = tm.timer() + try: + with timer: + while True: + url, auth_from_url = strip_auth_from_url(url) + if auth and auth_from_url: + raise ValueError('Cannot combine AUTH argument with ' 'credentials encoded in URL') + + if auth is None: + auth = auth_from_url + if auth is None: + auth = self._default_auth + # It would be confusing if we support explicit + # Authorization header with auth argument + if headers is not None and auth is not None and hdrs.AUTHORIZATION in headers: + raise ValueError( + 'Cannot combine AUTHORIZATION header ' 'with AUTH argument or credentials ' 'encoded in URL' + ) + + all_cookies = self._cookie_jar.filter_cookies(url) + + if cookies is not None: + tmp_cookie_jar = CookieJar() + tmp_cookie_jar.update_cookies(cookies) + req_cookies = tmp_cookie_jar.filter_cookies(url) + if req_cookies: + all_cookies.load(req_cookies) + + if proxy is not None: + proxy = URL(proxy) + elif self._trust_env: + for scheme, proxy_info in proxies_from_env().items(): + if scheme == url.scheme: + proxy = proxy_info.proxy + proxy_auth = proxy_info.proxy_auth + break + + req = self._request_class( + method, + url, + params=params, + headers=headers, + skip_auto_headers=skip_headers, + data=data, + cookies=all_cookies, + auth=auth, + version=version, + compress=compress, + chunked=chunked, + expect100=expect100, + loop=self._loop, + response_class=self._response_class, + proxy=proxy, + proxy_auth=proxy_auth, + timer=timer, + session=self, + ssl=ssl, + proxy_headers=proxy_headers, + traces=traces, + ) + + # connection timeout + try: + with CeilTimeout(real_timeout.connect, loop=self._loop): + assert self._connector is not None + conn = await self._connector.connect(req, traces=traces, timeout=real_timeout) + except asyncio.TimeoutError as exc: + raise ServerTimeoutError('Connection timeout ' 'to host {}'.format(url)) from exc + + assert conn.transport is not None + + assert conn.protocol is not None + conn.protocol.set_response_params( + timer=timer, + skip_payload=method.upper() == 'HEAD', + read_until_eof=read_until_eof, + auto_decompress=self._auto_decompress, + read_timeout=real_timeout.sock_read, + read_bufsize=read_bufsize, + ) + + try: + try: + resp = await req.send(conn) + try: + await resp.start(conn) + except BaseException: + resp.close() + raise + except BaseException: + conn.close() + raise + except ClientError: + raise + except OSError as exc: + raise ClientOSError(*exc.args) from exc + + self._cookie_jar.update_cookies(resp.cookies, resp.url) + + # redirects + if resp.status in (301, 302, 303, 307, 308) and allow_redirects: + for trace in traces: + await trace.send_request_redirect(method, url, headers, resp) + + redirects += 1 + history.append(resp) + if max_redirects and redirects >= max_redirects: + resp.close() + raise TooManyRedirects(history[0].request_info, tuple(history)) + + # For 301 and 302, mimic IE, now changed in RFC + # https://github.com/kennethreitz/requests/pull/269 + if (resp.status == 303 and resp.method != hdrs.METH_HEAD) or ( + resp.status in (301, 302) and resp.method == hdrs.METH_POST + ): + method = hdrs.METH_GET + data = None + if headers.get(hdrs.CONTENT_LENGTH): + headers.pop(hdrs.CONTENT_LENGTH) + + r_url = resp.headers.get(hdrs.LOCATION) or resp.headers.get(hdrs.URI) + if r_url is None: + # see github.com/aio-libs/aiohttp/issues/2022 + break + else: + # reading from correct redirection + # response is forbidden + resp.release() + + try: + parsed_url = URL(r_url, encoded=not self._requote_redirect_url) + + except ValueError as e: + raise InvalidURL(r_url) from e + + scheme = parsed_url.scheme + if scheme not in ('http', 'https', ''): + resp.close() + raise ValueError('Can redirect only to http or https') + elif not scheme: + parsed_url = url.join(parsed_url) + + # if url.origin() != parsed_url.origin(): + # auth = None + # headers.pop(hdrs.AUTHORIZATION, None) + + url = parsed_url + params = None + resp.release() + continue + + break + + # check response status + if raise_for_status is None: + raise_for_status = self._raise_for_status + if raise_for_status: + resp.raise_for_status() + + # register connection + if handle is not None: + if resp.connection is not None: + resp.connection.add_callback(handle.cancel) + else: + handle.cancel() + + resp._history = tuple(history) + + for trace in traces: + await trace.send_request_end(method, url, headers, resp) + return resp + + except BaseException as e: + # cleanup timer + tm.close() + if handle: + handle.cancel() + handle = None + + for trace in traces: + await trace.send_request_exception(method, url, headers, e) + raise diff --git a/src/burst2safe/token_example.py b/src/burst2safe/token_example.py new file mode 100644 index 0000000..f67eba6 --- /dev/null +++ b/src/burst2safe/token_example.py @@ -0,0 +1,89 @@ +import asyncio +import os +from pathlib import Path +from urllib.parse import urlparse + +from aiohttp import ClientSession +from aiohttp.hdrs import AUTHORIZATION + + +TRUSTED_HOSTS = [ + 'urs.earthdata.nasa.gov', + 'cumulus.asf.alaska.edu', + 'sentinel1.asf.alaska.edu', + 'sentinel1-burst.asf.alaska.edu', + 'datapool.asf.alaska.edu', + 'auth.asf.alaska.edu', +] + + +async def download_file_token(url: str, out_path: Path) -> None: + token = os.getenv('EDL_TOKEN') + headers = {AUTHORIZATION: f'Bearer {token}'} + async with ClientSession(headers=headers, trust_env=True) as session: + response = await session.get(url) + response.raise_for_status() + with open(out_path, 'wb') as f: + async for chunk in response.content.iter_chunked(2**14): + f.write(chunk) + response.close() + + +async def download_file_netrc(url: str, out_path: Path) -> None: + async with ClientSession(trust_env=True) as session: + response = await session.get(url) + response.raise_for_status() + with open(out_path, 'wb') as f: + async for chunk in response.content.iter_chunked(2**14): + f.write(chunk) + response.close() + + +async def download_file_token_v2(url: str, out_path: Path, max_redirects: int = 8) -> None: + token = os.getenv('EDL_TOKEN') + headers = {AUTHORIZATION: f'Bearer {token}'} + async with ClientSession(headers=headers, trust_env=True) as session: + for i in range(max_redirects): + print(url) + host = urlparse(url).hostname + if host not in TRUSTED_HOSTS: + session.headers.pop(AUTHORIZATION, None) + response = await session.get(url, allow_redirects=False) + response.raise_for_status() + if 300 <= response.status < 400: + url = response.headers['Location'] + session.cookie_jar.update_cookies(response.cookies) + if i == max_redirects - 1: + raise Exception(f'Maximum number of redirects reached: {max_redirects}') + elif 200 <= response.status < 300: + break + + with open(out_path, 'wb') as f: + async for chunk in response.content.iter_chunked(2**14): + f.write(chunk) + + response.close() + + +def main(): + # test_xml = 'https://datapool.asf.alaska.edu/CSLC/OPERA-S1/OPERA_L2_CSLC-S1_T115-245714-IW1_20241113T141635Z_20241114T085631Z_S1A_VV_v1.1.iso.xml' + # test_url = 'https://datapool.asf.alaska.edu/CSLC/OPERA-S1/OPERA_L2_CSLC-S1_T115-245714-IW1_20241113T141635Z_20241114T085631Z_S1A_VV_v1.1.h5' + # test_safe = 'https://datapool.asf.alaska.edu/SLC/SA/S1A_IW_SLC__1SDV_20241116T130250_20241116T130317_056580_06F04D_176C.zip' + test_burst = 'https://sentinel1-burst.asf.alaska.edu/S1A_IW_SLC__1SDV_20241116T130250_20241116T130317_056580_06F04D_176C/IW2/VH/2.xml' + + # works when .netric is present, but EDL_TOKEN isn't + # asyncio.run(download_file_netrc(test_url, test_path)) + + # doesn't work when .netric isn't present, but EDL_TOKEN is + # asyncio.run(download_file_token(test_url, test_path)) + + # asyncio.run(download_file_token_v2(test_safe, 'test.safe')) + # print('---') + # asyncio.run(download_file_token_v2(test_burst, 'test.xml')) + asyncio.run(download_file_token_v2(test_burst, 'test.xml')) + # asyncio.run(download_file_netrc(test_burst, 'test.xml')) + # asyncio.run(download_file_token(test_url, test_path)) + + +if __name__ == '__main__': + main() From e34aa21d3f21fb635ff92abd63440d8a083af18e Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 23 Dec 2024 10:16:01 +0000 Subject: [PATCH 21/41] Bump ASFHyP3/actions from 0.12.0 to 0.13.2 Bumps [ASFHyP3/actions](https://github.com/asfhyp3/actions) from 0.12.0 to 0.13.2. - [Release notes](https://github.com/asfhyp3/actions/releases) - [Changelog](https://github.com/ASFHyP3/actions/blob/develop/CHANGELOG.md) - [Commits](https://github.com/asfhyp3/actions/compare/v0.12.0...v0.13.2) --- updated-dependencies: - dependency-name: ASFHyP3/actions dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- .github/workflows/changelog-check.yml | 2 +- .github/workflows/create-jira-issue.yml | 2 +- .github/workflows/labeled-pr-check.yml | 2 +- .github/workflows/release-checklist-comment.yml | 2 +- .github/workflows/release.yml | 2 +- .github/workflows/static-analysis.yml | 4 ++-- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/workflows/changelog-check.yml b/.github/workflows/changelog-check.yml index 3b1e740..232d149 100644 --- a/.github/workflows/changelog-check.yml +++ b/.github/workflows/changelog-check.yml @@ -13,4 +13,4 @@ on: jobs: call-changelog-check-workflow: - uses: ASFHyP3/actions/.github/workflows/reusable-changelog-check.yml@v0.12.0 + uses: ASFHyP3/actions/.github/workflows/reusable-changelog-check.yml@v0.13.2 diff --git a/.github/workflows/create-jira-issue.yml b/.github/workflows/create-jira-issue.yml index d95ef84..7646baa 100644 --- a/.github/workflows/create-jira-issue.yml +++ b/.github/workflows/create-jira-issue.yml @@ -6,7 +6,7 @@ on: jobs: call-create-jira-issue-workflow: - uses: ASFHyP3/actions/.github/workflows/reusable-create-jira-issue.yml@v0.12.0 + uses: ASFHyP3/actions/.github/workflows/reusable-create-jira-issue.yml@v0.13.2 secrets: JIRA_BASE_URL: ${{ secrets.JIRA_BASE_URL }} JIRA_USER_EMAIL: ${{ secrets.JIRA_USER_EMAIL }} diff --git a/.github/workflows/labeled-pr-check.yml b/.github/workflows/labeled-pr-check.yml index f408f3b..465aaa8 100644 --- a/.github/workflows/labeled-pr-check.yml +++ b/.github/workflows/labeled-pr-check.yml @@ -12,4 +12,4 @@ on: jobs: call-labeled-pr-check-workflow: - uses: ASFHyP3/actions/.github/workflows/reusable-labeled-pr-check.yml@v0.12.0 + uses: ASFHyP3/actions/.github/workflows/reusable-labeled-pr-check.yml@v0.13.2 diff --git a/.github/workflows/release-checklist-comment.yml b/.github/workflows/release-checklist-comment.yml index b5c711f..1c3f8a9 100644 --- a/.github/workflows/release-checklist-comment.yml +++ b/.github/workflows/release-checklist-comment.yml @@ -9,7 +9,7 @@ on: jobs: call-release-workflow: - uses: ASFHyP3/actions/.github/workflows/reusable-release-checklist-comment.yml@v0.12.0 + uses: ASFHyP3/actions/.github/workflows/reusable-release-checklist-comment.yml@v0.13.2 permissions: pull-requests: write secrets: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 26887e5..0a4dd92 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -8,7 +8,7 @@ on: jobs: call-release-workflow: # Docs: https://github.com/ASFHyP3/actions - uses: ASFHyP3/actions/.github/workflows/reusable-release.yml@v0.12.0 + uses: ASFHyP3/actions/.github/workflows/reusable-release.yml@v0.13.2 with: release_prefix: burst2safe secrets: diff --git a/.github/workflows/static-analysis.yml b/.github/workflows/static-analysis.yml index d8f7b24..35c37dd 100644 --- a/.github/workflows/static-analysis.yml +++ b/.github/workflows/static-analysis.yml @@ -5,7 +5,7 @@ on: [pull_request] jobs: call-secrets-analysis-workflow: # Docs: https://github.com/ASFHyP3/actions - uses: ASFHyP3/actions/.github/workflows/reusable-secrets-analysis.yml@v0.12.0 + uses: ASFHyP3/actions/.github/workflows/reusable-secrets-analysis.yml@v0.13.2 check-with-black: runs-on: ubuntu-latest @@ -18,4 +18,4 @@ jobs: call-ruff-workflow: # Docs: https://github.com/ASFHyP3/actions - uses: ASFHyP3/actions/.github/workflows/reusable-ruff.yml@v0.12.0 + uses: ASFHyP3/actions/.github/workflows/reusable-ruff.yml@v0.13.2 From 0b97f4bca3967b84b5067c8c6b2eef3573e080a7 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Thu, 2 Jan 2025 09:24:01 -0600 Subject: [PATCH 22/41] working tokens --- src/burst2safe/download.py | 26 +++++----- src/burst2safe/token_example.py | 89 --------------------------------- 2 files changed, 12 insertions(+), 103 deletions(-) delete mode 100644 src/burst2safe/token_example.py diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index 674460b..798b673 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -10,6 +10,9 @@ from burst2safe.utils import BurstInfo +COOKIE_URL = 'https://sentinel1.asf.alaska.edu/METADATA_RAW/SA/S1A_IW_RAW__0SSV_20141229T072718_20141229T072750_003931_004B96_B79F.iso.xml' + + def get_url_dict(burst_infos: Iterable[BurstInfo], force: bool = False) -> dict: """Get a dictionary of URLs to download. Keys are save paths, and values are download URLs. @@ -43,15 +46,8 @@ async def get_async(session: aiohttp.ClientSession, url: str, max_redirects: int Returns: The response object """ - for i in range(max_redirects): - response = await session.get(url, allow_redirects=False) - response.raise_for_status() - if 300 <= response.status < 400: - url = response.headers['Location'] - elif 200 <= response.status < 300: - break - elif i == max_redirects - 1: - raise Exception(f'Maximum number of redirects reached: {max_redirects}') + response = await session.get(url) + response.raise_for_status() return response @@ -97,12 +93,14 @@ async def download_bursts_async(url_dict: dict) -> None: url_dict: A dictionary of URLs to download """ auth_type = check_earthdata_credentials() - if auth_type == 'token': - token = os.getenv('EDL_TOKEN') - headers = {aiohttp.hdrs.AUTHORIZATION: f'Bearer {token}'} - else: - headers = {} + headers = {'Authorization': f'Bearer {os.getenv("EDL_TOKEN")}'} if auth_type == 'token' else {} async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: + if auth_type == 'token': + # FIXME: Needed while burst extractor API doesn't support EDL tokens + cookie_response = await session.get(COOKIE_URL) + cookie_response.raise_for_status() + cookie_response.close() + tasks = [] for file_path, url in url_dict.items(): tasks.append(download_burst_url_async(session, url, file_path)) diff --git a/src/burst2safe/token_example.py b/src/burst2safe/token_example.py deleted file mode 100644 index f67eba6..0000000 --- a/src/burst2safe/token_example.py +++ /dev/null @@ -1,89 +0,0 @@ -import asyncio -import os -from pathlib import Path -from urllib.parse import urlparse - -from aiohttp import ClientSession -from aiohttp.hdrs import AUTHORIZATION - - -TRUSTED_HOSTS = [ - 'urs.earthdata.nasa.gov', - 'cumulus.asf.alaska.edu', - 'sentinel1.asf.alaska.edu', - 'sentinel1-burst.asf.alaska.edu', - 'datapool.asf.alaska.edu', - 'auth.asf.alaska.edu', -] - - -async def download_file_token(url: str, out_path: Path) -> None: - token = os.getenv('EDL_TOKEN') - headers = {AUTHORIZATION: f'Bearer {token}'} - async with ClientSession(headers=headers, trust_env=True) as session: - response = await session.get(url) - response.raise_for_status() - with open(out_path, 'wb') as f: - async for chunk in response.content.iter_chunked(2**14): - f.write(chunk) - response.close() - - -async def download_file_netrc(url: str, out_path: Path) -> None: - async with ClientSession(trust_env=True) as session: - response = await session.get(url) - response.raise_for_status() - with open(out_path, 'wb') as f: - async for chunk in response.content.iter_chunked(2**14): - f.write(chunk) - response.close() - - -async def download_file_token_v2(url: str, out_path: Path, max_redirects: int = 8) -> None: - token = os.getenv('EDL_TOKEN') - headers = {AUTHORIZATION: f'Bearer {token}'} - async with ClientSession(headers=headers, trust_env=True) as session: - for i in range(max_redirects): - print(url) - host = urlparse(url).hostname - if host not in TRUSTED_HOSTS: - session.headers.pop(AUTHORIZATION, None) - response = await session.get(url, allow_redirects=False) - response.raise_for_status() - if 300 <= response.status < 400: - url = response.headers['Location'] - session.cookie_jar.update_cookies(response.cookies) - if i == max_redirects - 1: - raise Exception(f'Maximum number of redirects reached: {max_redirects}') - elif 200 <= response.status < 300: - break - - with open(out_path, 'wb') as f: - async for chunk in response.content.iter_chunked(2**14): - f.write(chunk) - - response.close() - - -def main(): - # test_xml = 'https://datapool.asf.alaska.edu/CSLC/OPERA-S1/OPERA_L2_CSLC-S1_T115-245714-IW1_20241113T141635Z_20241114T085631Z_S1A_VV_v1.1.iso.xml' - # test_url = 'https://datapool.asf.alaska.edu/CSLC/OPERA-S1/OPERA_L2_CSLC-S1_T115-245714-IW1_20241113T141635Z_20241114T085631Z_S1A_VV_v1.1.h5' - # test_safe = 'https://datapool.asf.alaska.edu/SLC/SA/S1A_IW_SLC__1SDV_20241116T130250_20241116T130317_056580_06F04D_176C.zip' - test_burst = 'https://sentinel1-burst.asf.alaska.edu/S1A_IW_SLC__1SDV_20241116T130250_20241116T130317_056580_06F04D_176C/IW2/VH/2.xml' - - # works when .netric is present, but EDL_TOKEN isn't - # asyncio.run(download_file_netrc(test_url, test_path)) - - # doesn't work when .netric isn't present, but EDL_TOKEN is - # asyncio.run(download_file_token(test_url, test_path)) - - # asyncio.run(download_file_token_v2(test_safe, 'test.safe')) - # print('---') - # asyncio.run(download_file_token_v2(test_burst, 'test.xml')) - asyncio.run(download_file_token_v2(test_burst, 'test.xml')) - # asyncio.run(download_file_netrc(test_burst, 'test.xml')) - # asyncio.run(download_file_token(test_url, test_path)) - - -if __name__ == '__main__': - main() From 949dfb0971a797d64de8327a27653394cede64b8 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Thu, 2 Jan 2025 09:32:06 -0600 Subject: [PATCH 23/41] remove unused class --- src/burst2safe/aiohttp_b2s.py | 314 ---------------------------------- 1 file changed, 314 deletions(-) delete mode 100644 src/burst2safe/aiohttp_b2s.py diff --git a/src/burst2safe/aiohttp_b2s.py b/src/burst2safe/aiohttp_b2s.py deleted file mode 100644 index d8061a8..0000000 --- a/src/burst2safe/aiohttp_b2s.py +++ /dev/null @@ -1,314 +0,0 @@ -import asyncio -import warnings -from types import SimpleNamespace -from typing import Any, Iterable, Mapping, Optional, Union - -from aiohttp import ClientSession, hdrs, payload -from aiohttp.client import ClientTimeout -from aiohttp.client_exceptions import ClientError, ClientOSError, InvalidURL, ServerTimeoutError, TooManyRedirects -from aiohttp.client_reqrep import ClientResponse, Fingerprint, _merge_ssl_params -from aiohttp.cookiejar import CookieJar -from aiohttp.helpers import ( - BasicAuth, - # CeilTimeout, - TimeoutHandle, - proxies_from_env, - sentinel, - strip_auth_from_url, -) -from aiohttp.tracing import Trace -from aiohttp.typedefs import LooseCookies, LooseHeaders, StrOrURL -from multidict import istr -from yarl import URL - - -try: - from ssl import SSLContext -except ImportError: # pragma: no cover - SSLContext = object # type: ignore - - -class TrustingSession(ClientSession): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - async def _request( - self, - method: str, - str_or_url: StrOrURL, - *, - params: Optional[Mapping[str, str]] = None, - data: Any = None, - json: Any = None, - cookies: Optional[LooseCookies] = None, - headers: Optional[LooseHeaders] = None, - skip_auto_headers: Optional[Iterable[str]] = None, - auth: Optional[BasicAuth] = None, - allow_redirects: bool = True, - max_redirects: int = 10, - compress: Optional[str] = None, - chunked: Optional[bool] = None, - expect100: bool = False, - raise_for_status: Optional[bool] = None, - read_until_eof: bool = True, - proxy: Optional[StrOrURL] = None, - proxy_auth: Optional[BasicAuth] = None, - timeout: Union[ClientTimeout, object] = sentinel, - verify_ssl: Optional[bool] = None, - fingerprint: Optional[bytes] = None, - ssl_context: Optional[SSLContext] = None, - ssl: Optional[Union[SSLContext, bool, Fingerprint]] = None, - proxy_headers: Optional[LooseHeaders] = None, - trace_request_ctx: Optional[SimpleNamespace] = None, - read_bufsize: Optional[int] = None, - ) -> ClientResponse: - # NOTE: timeout clamps existing connect and read timeouts. We cannot - # set the default to None because we need to detect if the user wants - # to use the existing timeouts by setting timeout to None. - - if self.closed: - raise RuntimeError('Session is closed') - - ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) - - if data is not None and json is not None: - raise ValueError('data and json parameters can not be used at the same time') - elif json is not None: - data = payload.JsonPayload(json, dumps=self._json_serialize) - - if not isinstance(chunked, bool) and chunked is not None: - warnings.warn('Chunk size is deprecated #1615', DeprecationWarning) - - redirects = 0 - history = [] - version = self._version - - # Merge with default headers and transform to CIMultiDict - headers = self._prepare_headers(headers) - proxy_headers = self._prepare_headers(proxy_headers) - - try: - url = URL(str_or_url) - except ValueError as e: - raise InvalidURL(str_or_url) from e - - skip_headers = set(self._skip_auto_headers) - if skip_auto_headers is not None: - for i in skip_auto_headers: - skip_headers.add(istr(i)) - - if proxy is not None: - try: - proxy = URL(proxy) - except ValueError as e: - raise InvalidURL(proxy) from e - - if timeout is sentinel: - real_timeout = self._timeout # type: ClientTimeout - else: - if not isinstance(timeout, ClientTimeout): - real_timeout = ClientTimeout(total=timeout) # type: ignore - else: - real_timeout = timeout - # timeout is cumulative for all request operations - # (request, redirects, responses, data consuming) - tm = TimeoutHandle(self._loop, real_timeout.total) - handle = tm.start() - - if read_bufsize is None: - read_bufsize = self._read_bufsize - - traces = [ - Trace( - self, - trace_config, - trace_config.trace_config_ctx(trace_request_ctx=trace_request_ctx), - ) - for trace_config in self._trace_configs - ] - - for trace in traces: - await trace.send_request_start(method, url, headers) - - timer = tm.timer() - try: - with timer: - while True: - url, auth_from_url = strip_auth_from_url(url) - if auth and auth_from_url: - raise ValueError('Cannot combine AUTH argument with ' 'credentials encoded in URL') - - if auth is None: - auth = auth_from_url - if auth is None: - auth = self._default_auth - # It would be confusing if we support explicit - # Authorization header with auth argument - if headers is not None and auth is not None and hdrs.AUTHORIZATION in headers: - raise ValueError( - 'Cannot combine AUTHORIZATION header ' 'with AUTH argument or credentials ' 'encoded in URL' - ) - - all_cookies = self._cookie_jar.filter_cookies(url) - - if cookies is not None: - tmp_cookie_jar = CookieJar() - tmp_cookie_jar.update_cookies(cookies) - req_cookies = tmp_cookie_jar.filter_cookies(url) - if req_cookies: - all_cookies.load(req_cookies) - - if proxy is not None: - proxy = URL(proxy) - elif self._trust_env: - for scheme, proxy_info in proxies_from_env().items(): - if scheme == url.scheme: - proxy = proxy_info.proxy - proxy_auth = proxy_info.proxy_auth - break - - req = self._request_class( - method, - url, - params=params, - headers=headers, - skip_auto_headers=skip_headers, - data=data, - cookies=all_cookies, - auth=auth, - version=version, - compress=compress, - chunked=chunked, - expect100=expect100, - loop=self._loop, - response_class=self._response_class, - proxy=proxy, - proxy_auth=proxy_auth, - timer=timer, - session=self, - ssl=ssl, - proxy_headers=proxy_headers, - traces=traces, - ) - - # connection timeout - try: - with CeilTimeout(real_timeout.connect, loop=self._loop): - assert self._connector is not None - conn = await self._connector.connect(req, traces=traces, timeout=real_timeout) - except asyncio.TimeoutError as exc: - raise ServerTimeoutError('Connection timeout ' 'to host {}'.format(url)) from exc - - assert conn.transport is not None - - assert conn.protocol is not None - conn.protocol.set_response_params( - timer=timer, - skip_payload=method.upper() == 'HEAD', - read_until_eof=read_until_eof, - auto_decompress=self._auto_decompress, - read_timeout=real_timeout.sock_read, - read_bufsize=read_bufsize, - ) - - try: - try: - resp = await req.send(conn) - try: - await resp.start(conn) - except BaseException: - resp.close() - raise - except BaseException: - conn.close() - raise - except ClientError: - raise - except OSError as exc: - raise ClientOSError(*exc.args) from exc - - self._cookie_jar.update_cookies(resp.cookies, resp.url) - - # redirects - if resp.status in (301, 302, 303, 307, 308) and allow_redirects: - for trace in traces: - await trace.send_request_redirect(method, url, headers, resp) - - redirects += 1 - history.append(resp) - if max_redirects and redirects >= max_redirects: - resp.close() - raise TooManyRedirects(history[0].request_info, tuple(history)) - - # For 301 and 302, mimic IE, now changed in RFC - # https://github.com/kennethreitz/requests/pull/269 - if (resp.status == 303 and resp.method != hdrs.METH_HEAD) or ( - resp.status in (301, 302) and resp.method == hdrs.METH_POST - ): - method = hdrs.METH_GET - data = None - if headers.get(hdrs.CONTENT_LENGTH): - headers.pop(hdrs.CONTENT_LENGTH) - - r_url = resp.headers.get(hdrs.LOCATION) or resp.headers.get(hdrs.URI) - if r_url is None: - # see github.com/aio-libs/aiohttp/issues/2022 - break - else: - # reading from correct redirection - # response is forbidden - resp.release() - - try: - parsed_url = URL(r_url, encoded=not self._requote_redirect_url) - - except ValueError as e: - raise InvalidURL(r_url) from e - - scheme = parsed_url.scheme - if scheme not in ('http', 'https', ''): - resp.close() - raise ValueError('Can redirect only to http or https') - elif not scheme: - parsed_url = url.join(parsed_url) - - # if url.origin() != parsed_url.origin(): - # auth = None - # headers.pop(hdrs.AUTHORIZATION, None) - - url = parsed_url - params = None - resp.release() - continue - - break - - # check response status - if raise_for_status is None: - raise_for_status = self._raise_for_status - if raise_for_status: - resp.raise_for_status() - - # register connection - if handle is not None: - if resp.connection is not None: - resp.connection.add_callback(handle.cancel) - else: - handle.cancel() - - resp._history = tuple(history) - - for trace in traces: - await trace.send_request_end(method, url, headers, resp) - return resp - - except BaseException as e: - # cleanup timer - tm.close() - if handle: - handle.cancel() - handle = None - - for trace in traces: - await trace.send_request_exception(method, url, headers, e) - raise From 727465f047d8a10e3c8f5eefa6f6b9979f762c47 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Thu, 2 Jan 2025 09:40:48 -0600 Subject: [PATCH 24/41] update changelog --- CHANGELOG.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d4b2c48..aba4c02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [PEP 440](https://www.python.org/dev/peps/pep-0440/) and uses [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.4.0] + +### Added +* Downloads.py to support asynchronous downloads. +* Support for EDL token based authentication. + +### Changed +* Authorization behavior so that EDL credentials from a user's netrc are prioritized. Now writes credentials to the netrc if they are provided as environment variables. +* Switches to asynchronous download approach. +* In burst2stack.py all input files are now downloaded first. + ## [1.3.1] ### Changed From e766f1b3dee2f02b457aab6b77b7c7cb4266e9f7 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Thu, 2 Jan 2025 09:42:16 -0600 Subject: [PATCH 25/41] fix ruff --- src/burst2safe/burst2safe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/burst2safe/burst2safe.py b/src/burst2safe/burst2safe.py index 96d719f..e2789e1 100644 --- a/src/burst2safe/burst2safe.py +++ b/src/burst2safe/burst2safe.py @@ -8,9 +8,9 @@ from shapely.geometry import Polygon from burst2safe import utils +from burst2safe.download import download_bursts from burst2safe.safe import Safe from burst2safe.search import find_bursts -from burst2safe.download import download_bursts DESCRIPTION = """Convert a set of ASF burst SLCs to the ESA SAFE format. From cc76a7a64885f2d2dcba862289b1a486499e0f18 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Thu, 2 Jan 2025 09:50:15 -0600 Subject: [PATCH 26/41] fix fstring quotes --- environment.yml | 2 +- src/burst2safe/search.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/environment.yml b/environment.yml index e9a62ca..9efeb4f 100644 --- a/environment.yml +++ b/environment.yml @@ -3,7 +3,7 @@ channels: - conda-forge - nodefaults dependencies: - - python>=3.9 + - python<3.10 - pip - gdal - shapely>=2 diff --git a/src/burst2safe/search.py b/src/burst2safe/search.py index 4717449..3e35532 100644 --- a/src/burst2safe/search.py +++ b/src/burst2safe/search.py @@ -55,8 +55,8 @@ def find_stack_data(rel_orbit: int, extent: Polygon, start_date: datetime, end_d dataset=dataset, relativeOrbit=rel_orbit, intersectsWith=extent.centroid.wkt, - start=f'{start_date.strftime('%Y-%m-%d')}T00:00:00Z', - end=f'{end_date.strftime('%Y-%m-%d')}T23:59:59Z', + start=f'{start_date.strftime("%Y-%m-%d")}T00:00:00Z', + end=f'{end_date.strftime("%Y-%m-%d")}T23:59:59Z', ) absolute_orbits = list(set([int(result.properties['orbit']) for result in search_results])) return absolute_orbits, search_results @@ -189,8 +189,8 @@ def find_group( opts = dict(dataset=asf_search.constants.DATASET.SLC_BURST, intersectsWith=footprint.wkt, beamMode=mode) if use_relative_orbit: opts['relativeOrbit'] = orbit - opts['start'] = (f'{start_date.strftime('%Y-%m-%d')}T00:00:00Z',) - opts['end'] = (f'{end_date.strftime('%Y-%m-%d')}T23:59:59Z',) + opts['start'] = (f'{start_date.strftime("%Y-%m-%d")}T00:00:00Z',) + opts['end'] = (f'{end_date.strftime("%Y-%m-%d")}T23:59:59Z',) else: opts['absoluteOrbit'] = orbit search_results = asf_search.geo_search(**opts) From ed769c6c44a9b63b2d145fec9730283489815568 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Thu, 2 Jan 2025 09:50:44 -0600 Subject: [PATCH 27/41] fix python version --- environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/environment.yml b/environment.yml index 9efeb4f..e9a62ca 100644 --- a/environment.yml +++ b/environment.yml @@ -3,7 +3,7 @@ channels: - conda-forge - nodefaults dependencies: - - python<3.10 + - python>=3.9 - pip - gdal - shapely>=2 From a71a7c9cf8a7d5777f57f151eaec5d60d303e55c Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Fri, 3 Jan 2025 16:16:05 -0600 Subject: [PATCH 28/41] refactor auth for review --- src/burst2safe/auth.py | 27 +++++++++++++------ src/burst2safe/download.py | 6 ++--- src/burst2safe/search.py | 2 +- tests/test_auth.py | 54 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 77 insertions(+), 12 deletions(-) diff --git a/src/burst2safe/auth.py b/src/burst2safe/auth.py index d0b23b2..2181cd8 100644 --- a/src/burst2safe/auth.py +++ b/src/burst2safe/auth.py @@ -6,7 +6,7 @@ EARTHDATA_HOST = 'urs.earthdata.nasa.gov' -EARTHDATA_TOKEN_VAR = 'EDL_TOKEN' +TOKEN_ENV_VAR = 'EARTHDATA_TOKEN' def get_netrc() -> Path: @@ -73,22 +73,33 @@ def write_credentials_to_netrc_file(username: str, password: str) -> None: f.write(f'machine {EARTHDATA_HOST} login {username} password {password}\n') -def check_earthdata_credentials() -> None: +def check_earthdata_credentials(append=False) -> str: """Check for NASA EarthData credentials in the netrc file or environment variables. - Will preferentially use the netrc file, and write credentials to the netrc file if found in the environment. + + Args: + append: Whether to append the credentials to the netrc file if creds found in the environment + + Returns: + The method used to find the credentials ('netrc' or 'token') """ + if os.getenv(TOKEN_ENV_VAR): + return 'token' + username, password = find_creds_in_netrc(EARTHDATA_HOST) if username and password: return 'netrc' username, password = find_creds_in_env('EARTHDATA_USERNAME', 'EARTHDATA_PASSWORD') if username and password: - write_credentials_to_netrc_file(username, password) - return 'netrc' - - if os.getenv(EARTHDATA_TOKEN_VAR): - return 'token' + if append: + write_credentials_to_netrc_file(username, password) + return 'netrc' + else: + raise ValueError( + 'NASA Earthdata credentials only found in environment variables,' + 'but appending to netrc file not allowed. Please allow appending to netrc.' + ) raise ValueError( 'Please provide NASA Earthdata credentials via your .netrc file,' diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index 798b673..ac9895b 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -6,7 +6,7 @@ import aiohttp from tenacity import retry, retry_if_result, stop_after_attempt, stop_after_delay, wait_random -from burst2safe.auth import check_earthdata_credentials +from burst2safe.auth import TOKEN_ENV_VAR, check_earthdata_credentials from burst2safe.utils import BurstInfo @@ -92,8 +92,8 @@ async def download_bursts_async(url_dict: dict) -> None: Args: url_dict: A dictionary of URLs to download """ - auth_type = check_earthdata_credentials() - headers = {'Authorization': f'Bearer {os.getenv("EDL_TOKEN")}'} if auth_type == 'token' else {} + auth_type = check_earthdata_credentials(append=True) + headers = {'Authorization': f'Bearer {os.getenv(TOKEN_ENV_VAR)}'} if auth_type == 'token' else {} async with aiohttp.ClientSession(headers=headers, trust_env=True) as session: if auth_type == 'token': # FIXME: Needed while burst extractor API doesn't support EDL tokens diff --git a/src/burst2safe/search.py b/src/burst2safe/search.py index 3e35532..a11f3cb 100644 --- a/src/burst2safe/search.py +++ b/src/burst2safe/search.py @@ -254,7 +254,7 @@ def download_bursts(burst_infos: Iterable[BurstInfo]) -> None: download_info = [(value, key.parent, key.name) for key, value in downloads.items()] urls, dirs, names = zip(*download_info) - check_earthdata_credentials() + check_earthdata_credentials(append=True) session = asf_search.ASFSession() n_workers = min(len(urls), max(cpu_count() - 2, 1)) if n_workers == 1: diff --git a/tests/test_auth.py b/tests/test_auth.py index 5bbf0e8..d844bbd 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,5 +1,7 @@ from pathlib import Path +import pytest + from burst2safe import auth @@ -35,3 +37,55 @@ def test_find_creds_in_netrc(tmp_path, monkeypatch): m.setattr(auth, 'get_netrc', lambda: tmp_path / '.netrc') (tmp_path / '.netrc').write_text('') assert auth.find_creds_in_netrc('test') == (None, None) + + +def test_write_credentials_to_netrc_file(tmp_path, monkeypatch): + with monkeypatch.context() as m: + m.setattr(auth, 'get_netrc', lambda: tmp_path / '.netrc') + auth.write_credentials_to_netrc_file('foo', 'bar') + assert (tmp_path / '.netrc').read_text() == 'machine urs.earthdata.nasa.gov login foo password bar\n' + + +def test_check_earthdata_credentials_token(tmp_path, monkeypatch): + with monkeypatch.context() as m: + m.setenv('EARTHDATA_TOKEN', 'foo') + assert auth.check_earthdata_credentials() == 'token' + + +def test_check_earthdata_credentials_netrc(tmp_path, monkeypatch): + netrc_path = tmp_path / '.netrc' + netrc_path.touch() + netrc_path.write_text('machine urs.earthdata.nasa.gov login foo password bar\n') + with monkeypatch.context() as m: + m.delenv('EARTHDATA_TOKEN', raising=False) + m.setenv('EARTHDATA_USERNAME', 'baz') + m.setenv('EARTHDATA_PASSWORD', 'buzz') + m.setattr(auth, 'get_netrc', lambda: netrc_path) + assert auth.check_earthdata_credentials() == 'netrc' + netrc_path.read_text() == 'machine urs.earthdata.nasa.gov login foo password bar\n' + + +def test_check_earthdata_credentials_env(tmp_path, monkeypatch): + netrc_path = tmp_path / '.netrc' + with monkeypatch.context() as m: + m.delenv('EARTHDATA_TOKEN', raising=False) + m.setenv('EARTHDATA_USERNAME', 'baz') + m.setenv('EARTHDATA_PASSWORD', 'buzz') + m.setattr(auth, 'get_netrc', lambda: netrc_path) + + with pytest.raises(ValueError, match='NASA Earthdata credentials only found in environment variables*'): + auth.check_earthdata_credentials() + + assert auth.check_earthdata_credentials(append=True) == 'netrc' + netrc_path.read_text() == 'machine urs.earthdata.nasa.gov login baz password buzz\n' + + +def test_check_earthdata_credentials_none(tmp_path, monkeypatch): + netrc_path = tmp_path / '.netrc' + with monkeypatch.context() as m: + m.delenv('EDL_TOKEN', raising=False) + m.delenv('EDL_USERNAME', raising=False) + m.delenv('EDL_PASSWORD', raising=False) + m.setattr(auth, 'get_netrc', lambda: netrc_path) + with pytest.raises(ValueError, match='Please provide NASA Earthdata credentials*'): + auth.check_earthdata_credentials() From a39d44513f4a632d60b9625bcbc842503c5d6786 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Fri, 3 Jan 2025 16:18:11 -0600 Subject: [PATCH 29/41] remove references to EDL_TOKEN --- src/burst2safe/auth.py | 2 +- tests/test_auth.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/burst2safe/auth.py b/src/burst2safe/auth.py index 2181cd8..291e9f1 100644 --- a/src/burst2safe/auth.py +++ b/src/burst2safe/auth.py @@ -104,5 +104,5 @@ def check_earthdata_credentials(append=False) -> str: raise ValueError( 'Please provide NASA Earthdata credentials via your .netrc file,' 'the EARTHDATA_USERNAME and EARTHDATA_PASSWORD environment variables,' - 'or an EDL Token via the EDL_TOKEN environment variable.' + 'or an EDL Token via the EARTHDATA_TOKEN environment variable.' ) diff --git a/tests/test_auth.py b/tests/test_auth.py index d844bbd..95bd60d 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -83,9 +83,9 @@ def test_check_earthdata_credentials_env(tmp_path, monkeypatch): def test_check_earthdata_credentials_none(tmp_path, monkeypatch): netrc_path = tmp_path / '.netrc' with monkeypatch.context() as m: - m.delenv('EDL_TOKEN', raising=False) - m.delenv('EDL_USERNAME', raising=False) - m.delenv('EDL_PASSWORD', raising=False) + m.delenv('EARTHDATA_TOKEN', raising=False) + m.delenv('EARTHDATA_USERNAME', raising=False) + m.delenv('EARTHDATA_PASSWORD', raising=False) m.setattr(auth, 'get_netrc', lambda: netrc_path) with pytest.raises(ValueError, match='Please provide NASA Earthdata credentials*'): auth.check_earthdata_credentials() From e333241d81241f23661388901b83a11137473e55 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Fri, 3 Jan 2025 16:20:24 -0600 Subject: [PATCH 30/41] remove old download methods --- src/burst2safe/search.py | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/src/burst2safe/search.py b/src/burst2safe/search.py index a11f3cb..79c1027 100644 --- a/src/burst2safe/search.py +++ b/src/burst2safe/search.py @@ -1,11 +1,7 @@ -"""A package for converting ASF burst SLCs to the SAFE format""" - import warnings from collections.abc import Iterable -from concurrent.futures import ProcessPoolExecutor from datetime import datetime from itertools import product -from multiprocessing import cpu_count from pathlib import Path from typing import List, Optional @@ -14,9 +10,6 @@ from asf_search.Products.S1BurstProduct import S1BurstProduct from shapely.geometry import Polygon -from burst2safe.auth import check_earthdata_credentials -from burst2safe.utils import BurstInfo, download_url_with_retries - warnings.filterwarnings('ignore') @@ -239,27 +232,3 @@ def find_bursts( 'You must provide either a list of granules or minimum set of group parameters (orbit, and footprint).' ) return results - - -def download_bursts(burst_infos: Iterable[BurstInfo]) -> None: - """Download the burst data and metadata files using multiple workers. - - Args: - burst_infos: A list of BurstInfo objects - """ - downloads = {} - for burst_info in burst_infos: - downloads[burst_info.data_path] = burst_info.data_url - downloads[burst_info.metadata_path] = burst_info.metadata_url - download_info = [(value, key.parent, key.name) for key, value in downloads.items()] - urls, dirs, names = zip(*download_info) - - check_earthdata_credentials(append=True) - session = asf_search.ASFSession() - n_workers = min(len(urls), max(cpu_count() - 2, 1)) - if n_workers == 1: - for url, dir, name in zip(urls, dirs, names): - download_url_with_retries(url, dir, name, session) - else: - with ProcessPoolExecutor(max_workers=n_workers) as executor: - executor.map(download_url_with_retries, urls, dirs, names, [session] * len(urls)) From 6bce9659fd778f151656e03e38a673f4631a97ac Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Fri, 3 Jan 2025 16:21:35 -0600 Subject: [PATCH 31/41] fix docstring --- src/burst2safe/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/burst2safe/auth.py b/src/burst2safe/auth.py index 291e9f1..2086660 100644 --- a/src/burst2safe/auth.py +++ b/src/burst2safe/auth.py @@ -81,7 +81,7 @@ def check_earthdata_credentials(append=False) -> str: append: Whether to append the credentials to the netrc file if creds found in the environment Returns: - The method used to find the credentials ('netrc' or 'token') + The location of the preferred credentials ('netrc' or 'token') """ if os.getenv(TOKEN_ENV_VAR): return 'token' From 3aa8aaa2d0b1e7f896395ed404b9037a02812419 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Fri, 3 Jan 2025 16:35:48 -0600 Subject: [PATCH 32/41] clean up search --- src/burst2safe/search.py | 25 ++----------------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/src/burst2safe/search.py b/src/burst2safe/search.py index 79c1027..c240c38 100644 --- a/src/burst2safe/search.py +++ b/src/burst2safe/search.py @@ -32,29 +32,6 @@ def find_granules(granules: Iterable[str]) -> List[S1BurstProduct]: return list(results) -def find_stack_data(rel_orbit: int, extent: Polygon, start_date: datetime, end_date: datetime) -> List[int]: - """Find all orbits in a stack using ASF Search. - - Args: - rel_orbit: The relative orbit number of the stack - start_date: The start date of the stack - end_date: The end date of the stack - - Returns: - List of absolute orbit numbers - """ - dataset = asf_search.constants.DATASET.SLC_BURST - search_results = asf_search.geo_search( - dataset=dataset, - relativeOrbit=rel_orbit, - intersectsWith=extent.centroid.wkt, - start=f'{start_date.strftime("%Y-%m-%d")}T00:00:00Z', - end=f'{end_date.strftime("%Y-%m-%d")}T23:59:59Z', - ) - absolute_orbits = list(set([int(result.properties['orbit']) for result in search_results])) - return absolute_orbits, search_results - - def add_surrounding_bursts(bursts: List[S1BurstProduct], min_bursts: int) -> List[S1BurstProduct]: """Add bursts to the list to ensure each swath has at least `min_bursts` bursts. All bursts must be from the same absolute orbit, swath, and polarization. @@ -151,6 +128,8 @@ def find_group( swaths: List of swaths to include (default: all) mode: The collection mode to use (IW or EW) (default: IW) min_bursts: The minimum number of bursts per swath (default: 1) + start_date: The start date for relative orbit search + end_date: The end date for relative orbit search use_relative_orbit: Use relative orbit number instead of absolute orbit number (default: False) Returns: From 2c2d4ab1faeb03943171fc33692848063f84bc9e Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Sat, 4 Jan 2025 13:14:45 -0600 Subject: [PATCH 33/41] remove unused function --- src/burst2safe/burst2stack.py | 2 +- src/burst2safe/utils.py | 26 -------------------------- 2 files changed, 1 insertion(+), 27 deletions(-) diff --git a/src/burst2safe/burst2stack.py b/src/burst2safe/burst2stack.py index 8bed1ce..795db40 100644 --- a/src/burst2safe/burst2stack.py +++ b/src/burst2safe/burst2stack.py @@ -41,8 +41,8 @@ def burst2stack( start_date: The start date of the bursts end_date: The end date of the bursts extent: The bounding box of the bursts - swaths: List of swaths to include polarizations: List of polarizations to include + swaths: List of swaths to include mode: The collection mode to use (IW or EW) (default: IW) min_bursts: The minimum number of bursts per swath (default: 1) all_anns: Include product annotation files for all swaths, regardless of included bursts diff --git a/src/burst2safe/utils.py b/src/burst2safe/utils.py index a565299..3b04e6c 100644 --- a/src/burst2safe/utils.py +++ b/src/burst2safe/utils.py @@ -8,7 +8,6 @@ from pathlib import Path from typing import Dict, List, Optional -import asf_search import lxml.etree as ET from asf_search.Products.S1BurstProduct import S1BurstProduct from osgeo import gdal, ogr, osr @@ -223,31 +222,6 @@ def get_subxml_from_metadata( return desired_metadata -def download_url_with_retries( - url: str, path: str, filename: str = None, session: asf_search.ASFSession = None, max_retries: int = 3 -) -> None: - """Download a file using asf_search.download_url with retries and backoff. - - Args: - url: The URL to download - path: The path to save the file to - filename: The name of the file to save - session: The ASF session to use - max_retries: The maximum number of retries - """ - n_retries = 0 - file_exists = False - while n_retries < max_retries and not file_exists: - asf_search.download_url(url, path, filename, session) - - n_retries += 1 - if Path(path, filename).exists(): - file_exists = True - - if not file_exists: - raise ValueError(f'Failed to download {filename} after {max_retries} attempts.') - - def flatten(list_of_lists: List[List]) -> List: """Flatten a list of lists.""" return [item for sublist in list_of_lists for item in sublist] From 2f22c7d99a0e22ed5e3316867282d760099e6f86 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Sat, 4 Jan 2025 13:39:32 -0600 Subject: [PATCH 34/41] remove download tests --- src/burst2safe/download.py | 17 +++++++---------- tests/test_download.py | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 10 deletions(-) create mode 100644 tests/test_download.py diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index ac9895b..96a426f 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -35,13 +35,12 @@ def get_url_dict(burst_infos: Iterable[BurstInfo], force: bool = False) -> dict: @retry( reraise=True, retry=retry_if_result(lambda r: r.status == 202), wait=wait_random(0, 1), stop=stop_after_delay(120) ) -async def get_async(session: aiohttp.ClientSession, url: str, max_redirects: int = 5) -> aiohttp.ClientResponse: +async def get_async(session: aiohttp.ClientSession, url: str) -> aiohttp.ClientResponse: """Retry a GET request until a non-202 response is received Args: session: An aiohttp ClientSession url: The URL to download - max_redirects: The maximum number of redirects to follow Returns: The response object @@ -53,7 +52,7 @@ async def get_async(session: aiohttp.ClientSession, url: str, max_redirects: int @retry(reraise=True, stop=stop_after_attempt(3)) async def download_burst_url_async(session: aiohttp.ClientSession, url: str, file_path: Path) -> None: - """Retry a GET request until a non-202 response is received, then download data. + """Retry a burst URL GET request until a non-202 response is received, then download the file. Args: session: An aiohttp ClientSession @@ -61,16 +60,15 @@ async def download_burst_url_async(session: aiohttp.ClientSession, url: str, fil file_path: The path to save the downloaded data to """ response = await get_async(session, url) - assert response.status == 200 if file_path.suffix in ['.tif', '.tiff']: returned_filename = response.content_disposition.filename elif file_path.suffix == '.xml': url_parts = str(response.url).split('/') - ext = response.content_disposition.filename.split('.')[-1] - returned_filename = f'{url_parts[3]}_{url_parts[5]}.{ext}' + returned_filename = f'{url_parts[3]}_{url_parts[5]}.xml' else: raise ValueError(f'Invalid file extension: {file_path.suffix}') + if file_path.name != returned_filename: raise ValueError(f'Race condition encountered, incorrect url returned for file: {file_path.name}') @@ -78,12 +76,11 @@ async def download_burst_url_async(session: aiohttp.ClientSession, url: str, fil with open(file_path, 'wb') as f: async for chunk in response.content.iter_chunked(2**14): f.write(chunk) - response.close() except Exception as e: - response.close() - if file_path.exists(): - file_path.unlink() + file_path.unlink(missing_ok=True) raise e + finally: + response.close() async def download_bursts_async(url_dict: dict) -> None: diff --git a/tests/test_download.py b/tests/test_download.py new file mode 100644 index 0000000..a4a4ad8 --- /dev/null +++ b/tests/test_download.py @@ -0,0 +1,34 @@ +from collections import namedtuple + +from burst2safe import download + + +def test_get_url_dict(tmp_path): + DummyBurst = namedtuple('DummyBurst', ['data_path', 'data_url', 'metadata_path', 'metadata_url']) + burst_infos = [ + DummyBurst( + data_path=tmp_path / 'data1.tif', + data_url='http://data1.tif', + metadata_path=tmp_path / 'metadata1.xml', + metadata_url='http://metadata1.xml', + ), + DummyBurst( + data_path=tmp_path / 'data2.tiff', + data_url='http://data2.tiff', + metadata_path=tmp_path / 'metadata2.xml', + metadata_url='http://metadata2.xml', + ), + ] + url_dict = download.get_url_dict(burst_infos) + expected = { + tmp_path / 'data1.tif': 'http://data1.tif', + tmp_path / 'metadata1.xml': 'http://metadata1.xml', + tmp_path / 'data2.tiff': 'http://data2.tiff', + tmp_path / 'metadata2.xml': 'http://metadata2.xml', + } + assert url_dict == expected + + del expected[tmp_path / 'data1.tif'] + (tmp_path / 'data1.tif').touch() + url_dict = download.get_url_dict(burst_infos) + assert url_dict == expected From 67b89bf168049918f91856611881f22d436ba363 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Sat, 4 Jan 2025 14:10:03 -0600 Subject: [PATCH 35/41] refactor search --- src/burst2safe/search.py | 100 +++++++++++++++++++++++++++------------ 1 file changed, 70 insertions(+), 30 deletions(-) diff --git a/src/burst2safe/search.py b/src/burst2safe/search.py index c240c38..f0007a9 100644 --- a/src/burst2safe/search.py +++ b/src/burst2safe/search.py @@ -3,7 +3,7 @@ from datetime import datetime from itertools import product from pathlib import Path -from typing import List, Optional +from typing import Dict, List, Optional, Tuple import asf_search import numpy as np @@ -108,32 +108,18 @@ def get_burst_group( return search_results -def find_group( - orbit: int, - footprint: Polygon, - polarizations: Optional[Iterable] = None, - swaths: Optional[Iterable] = None, - mode: str = 'IW', - min_bursts: int = 1, - start_date: Optional[datetime] = None, - end_date: Optional[datetime] = None, - use_relative_orbit: bool = False, -) -> List[S1BurstProduct]: - """Find burst groups using ASF Search. +def sanitize_group_search_inputs( + polarizations: Optional[Iterable] = None, swaths: Optional[Iterable] = None, mode: str = 'IW' +) -> Tuple[List[str], List[str]]: + """Sanitize inputs for group search. Args: - orbit: The absolute orbit number of the bursts - footprint: The bounding box of the bursts polarizations: List of polarizations to include (default: VV) swaths: List of swaths to include (default: all) mode: The collection mode to use (IW or EW) (default: IW) - min_bursts: The minimum number of bursts per swath (default: 1) - start_date: The start date for relative orbit search - end_date: The end date for relative orbit search - use_relative_orbit: Use relative orbit number instead of absolute orbit number (default: False) Returns: - A list of S1BurstProduct objects + A tuple of sanitized polarizations and swaths """ if polarizations is None: polarizations = ['VV'] @@ -155,9 +141,72 @@ def find_group( if bad_swaths: raise ValueError(f'Invalid swaths: {" ".join(bad_swaths)}') + return polarizations, swaths + + +def add_missing_bursts( + search_results: List[S1BurstProduct], + polarizations: List[str], + swaths: List[str], + min_bursts: int, + use_relative_orbit: bool, +) -> List[S1BurstProduct]: + """Add missing bursts to the search results to ensure each swath/pol combo has at least `min_bursts` bursts. + + Args: + search_results: A list of S1BurstProduct objects + polarizations: List of polarizations to include + swaths: List of swaths to include + min_bursts: The minimum number of bursts per swath (default: 1) + use_relative_orbit: Use relative orbit number instead of absolute orbit number (default: False) + + Returns: + A list of S1BurstProduct objects + """ + grouped_results = [] + if use_relative_orbit: + absolute_orbits = list(set([int(result.properties['orbit']) for result in search_results])) + group_definitions = product(polarizations, swaths, absolute_orbits) + else: + group_definitions = product(polarizations, swaths) + + for group_definition in group_definitions: + sub_results = get_burst_group(search_results, *group_definition, min_bursts=min_bursts) + grouped_results.extend(sub_results) + return grouped_results + + +def find_group( + orbit: int, + footprint: Polygon, + polarizations: Optional[Iterable] = None, + swaths: Optional[Iterable] = None, + mode: str = 'IW', + min_bursts: int = 1, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + use_relative_orbit: bool = False, +) -> List[S1BurstProduct]: + """Find burst groups using ASF Search. + + Args: + orbit: The absolute orbit number of the bursts + footprint: The bounding box of the bursts + polarizations: List of polarizations to include (default: VV) + swaths: List of swaths to include (default: all) + mode: The collection mode to use (IW or EW) (default: IW) + min_bursts: The minimum number of bursts per swath (default: 1) + start_date: The start date for relative orbit search + end_date: The end date for relative orbit search + use_relative_orbit: Use relative orbit number instead of absolute orbit number (default: False) + + Returns: + A list of S1BurstProduct objects + """ if use_relative_orbit and not (start_date and end_date): raise ValueError('You must provide start and end dates when using relative orbit number.') + polarizations, swaths = sanitize_group_search_inputs(polarizations, swaths, mode) opts = dict(dataset=asf_search.constants.DATASET.SLC_BURST, intersectsWith=footprint.wkt, beamMode=mode) if use_relative_orbit: opts['relativeOrbit'] = orbit @@ -167,16 +216,7 @@ def find_group( opts['absoluteOrbit'] = orbit search_results = asf_search.geo_search(**opts) - final_results = [] - if use_relative_orbit: - absolute_orbits = list(set([int(result.properties['orbit']) for result in search_results])) - group_definitions = product(polarizations, swaths, absolute_orbits) - else: - group_definitions = product(polarizations, swaths) - - for group_definition in group_definitions: - sub_results = get_burst_group(search_results, *group_definition, min_bursts=min_bursts) - final_results.extend(sub_results) + final_results = add_missing_bursts(search_results, polarizations, swaths, min_bursts, use_relative_orbit) return final_results From 8dd7baf710e90fd7b72ed50cb4ca09f5b7802542 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Sat, 4 Jan 2025 14:13:29 -0600 Subject: [PATCH 36/41] remove unused type --- src/burst2safe/search.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/burst2safe/search.py b/src/burst2safe/search.py index f0007a9..74c7de8 100644 --- a/src/burst2safe/search.py +++ b/src/burst2safe/search.py @@ -3,7 +3,7 @@ from datetime import datetime from itertools import product from pathlib import Path -from typing import Dict, List, Optional, Tuple +from typing import List, Optional, Tuple import asf_search import numpy as np From be788186762d5756195af9e2022f034ba4c7d221 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Sat, 4 Jan 2025 14:24:07 -0600 Subject: [PATCH 37/41] add search test --- tests/test_search.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/test_search.py b/tests/test_search.py index 6ff5bf0..fbb0183 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -34,3 +34,21 @@ def test_add_surrounding_bursts(product): mock_search.assert_called_once_with( dataset='SLC-BURST', absoluteOrbit=1, polarization='VV', fullBurstID=burst_ids ) + + +def test_sanitize_group_search_inputs(): + pols, swaths = search.sanitize_group_search_inputs() + assert pols == ['VV'] + assert swaths == [None] + + assert search.sanitize_group_search_inputs(polarizations=['HH'])[0] == ['HH'] + assert search.sanitize_group_search_inputs(swaths=['IW2'])[1] == ['IW2'] + + with pytest.raises(ValueError, match='Invalid polarization*'): + search.sanitize_group_search_inputs(polarizations=['VV', 'BB']) + + with pytest.raises(ValueError, match='Invalid swath*'): + search.sanitize_group_search_inputs(swaths=['IW1'], mode='EW') + + with pytest.raises(ValueError, match='Invalid swath*'): + search.sanitize_group_search_inputs(swaths=['EW1'], mode='IW') From 17532e631e60350c340197b2991076801e456a2f Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Mon, 6 Jan 2025 07:12:44 -0600 Subject: [PATCH 38/41] fix xml check --- src/burst2safe/download.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/burst2safe/download.py b/src/burst2safe/download.py index 96a426f..521043f 100644 --- a/src/burst2safe/download.py +++ b/src/burst2safe/download.py @@ -65,7 +65,8 @@ async def download_burst_url_async(session: aiohttp.ClientSession, url: str, fil returned_filename = response.content_disposition.filename elif file_path.suffix == '.xml': url_parts = str(response.url).split('/') - returned_filename = f'{url_parts[3]}_{url_parts[5]}.xml' + ext = response.content_disposition.filename.split('.')[-1] + returned_filename = f'{url_parts[3]}_{url_parts[5]}.{ext}' else: raise ValueError(f'Invalid file extension: {file_path.suffix}') From f1f08eb26606c5acc1f75c00e9ba21af2c5e8087 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Mon, 6 Jan 2025 07:15:16 -0600 Subject: [PATCH 39/41] update readme --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index bdfcd81..125d40d 100644 --- a/README.md +++ b/README.md @@ -29,12 +29,14 @@ conda install -c conda-forge burst2safe ### Credentials To use `burst2safe`, you must provide your Earthdata Login credentials via two environment variables -(`EARTHDATA_USERNAME` and `EARTHDATA_PASSWORD`), or via your `.netrc` file. +(`EARTHDATA_USERNAME` and `EARTHDATA_PASSWORD`), or via your `.netrc` file. Alternatively, you can use an Earthdata Login stored in the `EARTHDATA_TOKEN` environment variable. If you do not already have an Earthdata account, you can sign up [here](https://urs.earthdata.nasa.gov/home). If you would like to set up Earthdata Login via your `.netrc` file, check out this [guide](https://harmony.earthdata.nasa.gov/docs#getting-started) to get started. +If you would like to set up Earthdata Login via a token, check out this [guide](https://urs.earthdata.nasa.gov/documentation/for_users/user_token) to get started. + ## burst2safe usage The `burst2safe` command line tool can be run using the following structure: ```bash From 752e420360e1a3b2a09c715172ba530b51bc50c8 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Mon, 6 Jan 2025 07:32:45 -0600 Subject: [PATCH 40/41] update readme 2 --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 125d40d..8be34fa 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ conda install -c conda-forge burst2safe ### Credentials To use `burst2safe`, you must provide your Earthdata Login credentials via two environment variables -(`EARTHDATA_USERNAME` and `EARTHDATA_PASSWORD`), or via your `.netrc` file. Alternatively, you can use an Earthdata Login stored in the `EARTHDATA_TOKEN` environment variable. +(`EARTHDATA_USERNAME` and `EARTHDATA_PASSWORD`), or via your `.netrc` file. Alternatively, you can use an Earthdata Login Token stored in the `EARTHDATA_TOKEN` environment variable. If you do not already have an Earthdata account, you can sign up [here](https://urs.earthdata.nasa.gov/home). @@ -37,6 +37,8 @@ If you would like to set up Earthdata Login via your `.netrc` file, check out th If you would like to set up Earthdata Login via a token, check out this [guide](https://urs.earthdata.nasa.gov/documentation/for_users/user_token) to get started. +Note that `burst2safe` will prefer authorization information in this order: token > .netrc > username/password in environment. So if you have both a .netrc file and a token configured, it will use the token. + ## burst2safe usage The `burst2safe` command line tool can be run using the following structure: ```bash From d42e37718408dbeba90b2176e73da5bdc849e5b6 Mon Sep 17 00:00:00 2001 From: Forrest Williams Date: Mon, 6 Jan 2025 07:36:10 -0600 Subject: [PATCH 41/41] update changelog --- CHANGELOG.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aba4c02..3de0950 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,12 +9,13 @@ and uses [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [1.4.0] ### Added -* Downloads.py to support asynchronous downloads. +* download.py to support asynchronous downloads. * Support for EDL token based authentication. ### Changed -* Authorization behavior so that EDL credentials from a user's netrc are prioritized. Now writes credentials to the netrc if they are provided as environment variables. -* Switches to asynchronous download approach. +* Authorization behavior so that EDL credentials from an EDL token are prioritized above a username/password in either a netrc or the environment. +* Authorization behavior so that EDL username/password from a user's netrc are prioritized. Now writes username/password to the netrc if they are provided as environment variables. +* Switched to an asynchronous download approach. * In burst2stack.py all input files are now downloaded first. ## [1.3.1]