diff --git a/src/python/gudhi/representations/vector_methods.py b/src/python/gudhi/representations/vector_methods.py index 36f445884c..14d2802e71 100644 --- a/src/python/gudhi/representations/vector_methods.py +++ b/src/python/gudhi/representations/vector_methods.py @@ -756,7 +756,7 @@ def __init__( self, quantiser=KMeans(n_clusters=2, n_init="auto"), weighting_method="cloud", - contrast="gaussian" + contrast="gaussian", ): """ Constructor for the Atol measure vectorisation class. @@ -794,7 +794,8 @@ def get_weighting_method(self): def fit(self, X, y=None, sample_weight=None): """ - Calibration step: fit centers to the sample measures and derive inertias between centers. + Calibration step: fit centers to the target sample measures and derive inertias between centers. If the target + does not contain enough points for creating the intended number of centers, we fill in with bogus centers. Parameters: X (list N x d numpy arrays): input measures in R^d from which to learn center locations and inertias @@ -806,25 +807,32 @@ def fit(self, X, y=None, sample_weight=None): Returns: self """ - if not hasattr(self.quantiser, 'fit'): - raise TypeError("quantiser %s has no `fit` attribute." % (self.quantiser)) - - # In fitting we remove infinite death time points so that every center is finite - X = [dgm[~np.isinf(dgm).any(axis=1), :] for dgm in X] + n_clusters = self.quantiser.n_clusters + if not len(X): + raise ValueError("Cannot fit Atol on empty target.") + measures_concat = np.concatenate(X) if sample_weight is None: sample_weight = [self.get_weighting_method()(measure) for measure in X] - - measures_concat = np.concatenate(X) weights_concat = np.concatenate(sample_weight) - self.quantiser.fit(X=measures_concat, sample_weight=weights_concat) + # In fitting we remove infinite birth/death time points so that every center is finite. We do not care about duplicates. + filtered_measures_concat = measures_concat[~np.isinf(measures_concat).any(axis=1), :] if len(measures_concat) else measures_concat + filtered_weights_concat = weights_concat[~np.isinf(measures_concat).any(axis=1)] if len(measures_concat) else weights_concat + n_points = len(filtered_measures_concat) + if not n_points: + raise ValueError("Cannot fit Atol on measure with infinite components only.") + if n_points < n_clusters: + self.quantiser.n_clusters = n_points + + self.quantiser.fit(X=filtered_measures_concat, sample_weight=filtered_weights_concat) self.centers = self.quantiser.cluster_centers_ + # Hack, but some people are unhappy if the order depends on the version of sklearn self.centers = self.centers[np.lexsort(self.centers.T)] if self.quantiser.n_clusters == 1: - dist_centers = pairwise.pairwise_distances(measures_concat) + dist_centers = pairwise.pairwise_distances(filtered_measures_concat) np.fill_diagonal(dist_centers, 0) best_inertia = np.max(dist_centers)/2 if np.max(dist_centers)/2 > 0 else 1 self.inertias = np.array([best_inertia]) @@ -832,6 +840,15 @@ def fit(self, X, y=None, sample_weight=None): dist_centers = pairwise.pairwise_distances(self.centers) dist_centers[dist_centers == 0] = np.inf self.inertias = np.min(dist_centers, axis=0)/2 + + if n_points < n_clusters: + # There weren't enough points to fit n_clusters, so we arbitrarily put centers as [-np.inf]^measure_dim. + print(f"[Atol] after filtering had only {n_points=} to fit {n_clusters=}, adding meaningless centers.") + fill_center = np.repeat(np.inf, repeats=X[0].shape[1]) + fill_inertia = 0 + self.centers = np.concatenate([self.centers, np.repeat([fill_center], repeats=n_clusters-n_points, axis=0)]) + self.inertias = np.concatenate([self.inertias, np.repeat(fill_inertia, repeats=n_clusters-n_points)]) + self.quantiser.n_clusters = n_clusters return self def __call__(self, measure, sample_weight=None): diff --git a/src/python/test/test_representations_interface.py b/src/python/test/test_representations_interface.py new file mode 100644 index 0000000000..d22f412939 --- /dev/null +++ b/src/python/test/test_representations_interface.py @@ -0,0 +1,85 @@ +# The following tests only check that the program runs, not what it outputs + +import numpy as np + +from sklearn.base import clone +from sklearn.cluster import KMeans + +from gudhi.representations import (Atol, Landscape, Silhouette, BettiCurve, ComplexPolynomial, \ + TopologicalVector, PersistenceImage, Entropy) + +vectorizers = { + "atol": Atol(quantiser=KMeans(n_clusters=2, random_state=202312, n_init="auto")), + # "betti": BettiCurve(), +} + +diag1 = [np.array([[0., np.inf], + [0., 8.94427191], + [0., 7.28010989], + [0., 6.08276253], + [0., 5.83095189], + [0., 5.38516481], + [0., 5.]]), + np.array([[11., np.inf], + [6.32455532, 6.70820393]]), + np.empty(shape=[0, 2])] + +diag2 = [np.array([[0., np.inf], + [0., 8.94427191], + [0., 7.28010989], + [0., 6.08276253], + [0., 5.83095189], + [0., 5.38516481], + [0., 5.]]), + np.array([[11., np.inf], + [6.32455532, 6.70820393]]), + np.array([[0., np.inf], + [0., 1]])] + +diag3 = [np.empty(shape=[0, 2])] + + +def test_fit(): + print(f" > Testing `fit`.") + for name, vectorizer in vectorizers.items(): + print(f" >> Testing {name}") + clone(vectorizer).fit(X=[diag1[0], diag2[0]]) + + +def test_transform(): + print(f" > Testing `transform`.") + for name, vectorizer in vectorizers.items(): + print(f" >> Testing {name}") + clone(vectorizer).fit_transform(X=[diag1[0], diag2[0], diag3[0]]) + + +def test_transform_empty(): + print(f" > Testing `transform_empty`.") + for name, vectorizer in vectorizers.items(): + print(f" >> Testing {name}") + copy_vec = clone(vectorizer).fit(X=[diag1[0], diag2[0]]) + copy_vec.transform(X=[diag3[0], diag3[0]]) + + +def test_set_output(): + print(f" > Testing `set_output`.") + try: + import pandas + for name, vectorizer in vectorizers.items(): + print(f" >> Testing {name}") + clone(vectorizer).set_output(transform="pandas") + except ImportError: + print("Missing pandas, skipping set_output test") + + +def test_compose(): + print(f" > Testing composition with `sklearn.compose.ColumnTransformer`.") + from sklearn.compose import ColumnTransformer + for name, vectorizer in vectorizers.items(): + print(f" >> Testing {name}") + ct = ColumnTransformer([ + (f"{name}-0", clone(vectorizer), 0), + (f"{name}-1", clone(vectorizer), 1), + (f"{name}-2", clone(vectorizer), 2)] + ) + ct.fit_transform(X=[diag1, diag2])