Skip to content

Commit

Permalink
enh: perform basin availability checks in daemon thread
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Jan 1, 2024
1 parent 0804911 commit 3ca9ca3
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 33 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
0.56.2
- enh: perform basin availability checks in daemon thread
0.56.1
- enh: priority-based basin sorting (file over remote, http over dcor)
0.56.0
Expand Down
15 changes: 9 additions & 6 deletions dclab/rtdc_dataset/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,7 @@ def _feature_candidates(self):
feats = list(self._events.keys())
feats += list(self._usertemp.keys())
feats += list(AncillaryFeature.feature_names)
for bn in self.basins:
if bn.is_available():
feats += bn.features
feats += self.features_basin
feats = sorted(set(feats))
# exclude non-standard features
featsv = [ff for ff in feats if dfn.feature_exists(ff)]
Expand Down Expand Up @@ -401,6 +399,11 @@ def features_basin(self):
if self.basins:
features = []
for bn in self.basins:
if bn.features and set(bn.features) <= set(features):
# We already have the features from a different basin.
# There might be a basin availability check going on
# somewhere, but we are not interested in it.
continue
if bn.is_available():
features += bn.features
return sorted(set(features))
Expand Down Expand Up @@ -429,8 +432,8 @@ def features_loaded(self):
features_loaded = []
for feat in self.features:
if (feat in features_innate
or feat in FEATURES_RAPID
or feat in self._usertemp
or feat in FEATURES_RAPID
or feat in self._usertemp
or feat in self._ancillaries):
# Note that there is no hash checking here for
# ancillary features. This might be interesting
Expand Down Expand Up @@ -732,7 +735,7 @@ def basins_retrieve(self):
"features": bdict.get("features"),
# Make sure the measurement identifier is checked.
"measurement_identifier": self.get_measurement_identifier(),
}
}

if bdict["type"] == "file":
for pp in bdict["paths"]:
Expand Down
16 changes: 16 additions & 0 deletions dclab/rtdc_dataset/feat_basin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,20 @@
from __future__ import annotations

import abc
import threading
from typing import Dict


class BasinAvailabilityChecker(threading.Thread):
def __init__(self, basin, *args, **kwargs):
super(BasinAvailabilityChecker, self).__init__(*args, daemon=True,
**kwargs)
self.basin = basin

def run(self):
self.basin.is_available()


class Basin(abc.ABC):
"""A basin represents data from an external source
Expand Down Expand Up @@ -61,6 +72,10 @@ def __init__(self, location, name=None, description=None,
#: additional keyword arguments passed to the basin
self.kwargs = kwargs
self._ds = None
# perform availability check in separate thread
self._av_check_lock = threading.Lock()
self._av_check = BasinAvailabilityChecker(self)
self._av_check.start()

def _assert_measurement_identifier(self):
"""Make sure the basin matches the measurement identifier
Expand Down Expand Up @@ -140,6 +155,7 @@ def close(self):
"""Close any open file handles or connections"""
if self._ds is not None:
self._ds.close()
self._av_check.join(0.5)

def get_feature_data(self, feat):
"""Return an object representing feature data of the basin"""
Expand Down
23 changes: 12 additions & 11 deletions dclab/rtdc_dataset/fmt_dcor/basin.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,19 @@ def is_available(self):
:class:`.APIHandler`. You can add tokens with
:func:`.APIHandler.add_api_key`.
"""
if not REQUESTS_AVAILABLE:
# don't even bother
self._available_verified = False
elif not is_full_dcor_url(self.location):
# not a full DCOR URL
self._available_verified = False
if self._available_verified is None:
api = APIHandler(self.location)
try:
self._available_verified = api.get("valid")
except DCORAccessError:
with self._av_check_lock:
if not REQUESTS_AVAILABLE:
# don't even bother
self._available_verified = False
elif not is_full_dcor_url(self.location):
# not a full DCOR URL
self._available_verified = False
if self._available_verified is None:
api = APIHandler(self.location)
try:
self._available_verified = api.get("valid")
except DCORAccessError:
self._available_verified = False
return self._available_verified


Expand Down
11 changes: 6 additions & 5 deletions dclab/rtdc_dataset/fmt_hdf5/basin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ def load_dataset(self, location, **kwargs):
return RTDC_HDF5(location, enable_basins=False, **kwargs)

def is_available(self):
avail = False
try:
avail = pathlib.Path(self.location).exists()
except OSError:
pass
with self._av_check_lock:
avail = False
try:
avail = pathlib.Path(self.location).exists()
except OSError:
pass
return avail
20 changes: 11 additions & 9 deletions dclab/rtdc_dataset/fmt_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,16 @@ def is_available(self):
Caching policy: Once this method returns True, it will always
return True.
"""
if not REQUESTS_AVAILABLE:
# don't even bother
self._available_verified = False
if self._available_verified is None:
avail, reason = is_url_available(self.location, ret_reason=True)
if reason in ["forbidden", "not found"]:
# we cannot access the URL in the near future
with self._av_check_lock:
if not REQUESTS_AVAILABLE:
# don't even bother
self._available_verified = False
elif avail:
self._available_verified = True
if self._available_verified is None:
avail, reason = is_url_available(self.location,
ret_reason=True)
if reason in ["forbidden", "not found"]:
# we cannot access the URL in the near future
self._available_verified = False
elif avail:
self._available_verified = True
return self._available_verified
5 changes: 3 additions & 2 deletions dclab/rtdc_dataset/fmt_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ def is_available(self):
Caching policy: Once this method returns True, it will always
return True.
"""
if not self._available_verified:
self._available_verified = (
with self._av_check_lock:
if not self._available_verified:
self._available_verified = (
S3FS_AVAILABLE and is_s3_object_available(self.location))
return self._available_verified

Expand Down

0 comments on commit 3ca9ca3

Please sign in to comment.