Skip to content

Commit

Permalink
Merge pull request #1096 from martinroyer/fixatol
Browse files Browse the repository at this point in the history
Some robustifying fixes on Atol fit and tests for vectorizers
  • Loading branch information
VincentRouvreau authored Jul 29, 2024
2 parents b4977a2 + 040fb54 commit e6227ae
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 11 deletions.
39 changes: 28 additions & 11 deletions src/python/gudhi/representations/vector_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ def __init__(
self,
quantiser=KMeans(n_clusters=2, n_init="auto"),
weighting_method="cloud",
contrast="gaussian"
contrast="gaussian",
):
"""
Constructor for the Atol measure vectorisation class.
Expand Down Expand Up @@ -794,7 +794,8 @@ def get_weighting_method(self):

def fit(self, X, y=None, sample_weight=None):
"""
Calibration step: fit centers to the sample measures and derive inertias between centers.
Calibration step: fit centers to the target sample measures and derive inertias between centers. If the target
does not contain enough points for creating the intended number of centers, we fill in with bogus centers.
Parameters:
X (list N x d numpy arrays): input measures in R^d from which to learn center locations and inertias
Expand All @@ -806,32 +807,48 @@ def fit(self, X, y=None, sample_weight=None):
Returns:
self
"""
if not hasattr(self.quantiser, 'fit'):
raise TypeError("quantiser %s has no `fit` attribute." % (self.quantiser))

# In fitting we remove infinite death time points so that every center is finite
X = [dgm[~np.isinf(dgm).any(axis=1), :] for dgm in X]
n_clusters = self.quantiser.n_clusters

if not len(X):
raise ValueError("Cannot fit Atol on empty target.")
measures_concat = np.concatenate(X)
if sample_weight is None:
sample_weight = [self.get_weighting_method()(measure) for measure in X]

measures_concat = np.concatenate(X)
weights_concat = np.concatenate(sample_weight)

self.quantiser.fit(X=measures_concat, sample_weight=weights_concat)
# In fitting we remove infinite birth/death time points so that every center is finite. We do not care about duplicates.
filtered_measures_concat = measures_concat[~np.isinf(measures_concat).any(axis=1), :] if len(measures_concat) else measures_concat
filtered_weights_concat = weights_concat[~np.isinf(measures_concat).any(axis=1)] if len(measures_concat) else weights_concat

n_points = len(filtered_measures_concat)
if not n_points:
raise ValueError("Cannot fit Atol on measure with infinite components only.")
if n_points < n_clusters:
self.quantiser.n_clusters = n_points

self.quantiser.fit(X=filtered_measures_concat, sample_weight=filtered_weights_concat)
self.centers = self.quantiser.cluster_centers_

# Hack, but some people are unhappy if the order depends on the version of sklearn
self.centers = self.centers[np.lexsort(self.centers.T)]
if self.quantiser.n_clusters == 1:
dist_centers = pairwise.pairwise_distances(measures_concat)
dist_centers = pairwise.pairwise_distances(filtered_measures_concat)
np.fill_diagonal(dist_centers, 0)
best_inertia = np.max(dist_centers)/2 if np.max(dist_centers)/2 > 0 else 1
self.inertias = np.array([best_inertia])
else:
dist_centers = pairwise.pairwise_distances(self.centers)
dist_centers[dist_centers == 0] = np.inf
self.inertias = np.min(dist_centers, axis=0)/2

if n_points < n_clusters:
# There weren't enough points to fit n_clusters, so we arbitrarily put centers as [-np.inf]^measure_dim.
print(f"[Atol] after filtering had only {n_points=} to fit {n_clusters=}, adding meaningless centers.")
fill_center = np.repeat(np.inf, repeats=X[0].shape[1])
fill_inertia = 0
self.centers = np.concatenate([self.centers, np.repeat([fill_center], repeats=n_clusters-n_points, axis=0)])
self.inertias = np.concatenate([self.inertias, np.repeat(fill_inertia, repeats=n_clusters-n_points)])
self.quantiser.n_clusters = n_clusters
return self

def __call__(self, measure, sample_weight=None):
Expand Down
85 changes: 85 additions & 0 deletions src/python/test/test_representations_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# The following tests only check that the program runs, not what it outputs

import numpy as np

from sklearn.base import clone
from sklearn.cluster import KMeans

from gudhi.representations import (Atol, Landscape, Silhouette, BettiCurve, ComplexPolynomial, \
TopologicalVector, PersistenceImage, Entropy)

vectorizers = {
"atol": Atol(quantiser=KMeans(n_clusters=2, random_state=202312, n_init="auto")),
# "betti": BettiCurve(),
}

diag1 = [np.array([[0., np.inf],
[0., 8.94427191],
[0., 7.28010989],
[0., 6.08276253],
[0., 5.83095189],
[0., 5.38516481],
[0., 5.]]),
np.array([[11., np.inf],
[6.32455532, 6.70820393]]),
np.empty(shape=[0, 2])]

diag2 = [np.array([[0., np.inf],
[0., 8.94427191],
[0., 7.28010989],
[0., 6.08276253],
[0., 5.83095189],
[0., 5.38516481],
[0., 5.]]),
np.array([[11., np.inf],
[6.32455532, 6.70820393]]),
np.array([[0., np.inf],
[0., 1]])]

diag3 = [np.empty(shape=[0, 2])]


def test_fit():
print(f" > Testing `fit`.")
for name, vectorizer in vectorizers.items():
print(f" >> Testing {name}")
clone(vectorizer).fit(X=[diag1[0], diag2[0]])


def test_transform():
print(f" > Testing `transform`.")
for name, vectorizer in vectorizers.items():
print(f" >> Testing {name}")
clone(vectorizer).fit_transform(X=[diag1[0], diag2[0], diag3[0]])


def test_transform_empty():
print(f" > Testing `transform_empty`.")
for name, vectorizer in vectorizers.items():
print(f" >> Testing {name}")
copy_vec = clone(vectorizer).fit(X=[diag1[0], diag2[0]])
copy_vec.transform(X=[diag3[0], diag3[0]])


def test_set_output():
print(f" > Testing `set_output`.")
try:
import pandas
for name, vectorizer in vectorizers.items():
print(f" >> Testing {name}")
clone(vectorizer).set_output(transform="pandas")
except ImportError:
print("Missing pandas, skipping set_output test")


def test_compose():
print(f" > Testing composition with `sklearn.compose.ColumnTransformer`.")
from sklearn.compose import ColumnTransformer
for name, vectorizer in vectorizers.items():
print(f" >> Testing {name}")
ct = ColumnTransformer([
(f"{name}-0", clone(vectorizer), 0),
(f"{name}-1", clone(vectorizer), 1),
(f"{name}-2", clone(vectorizer), 2)]
)
ct.fit_transform(X=[diag1, diag2])

0 comments on commit e6227ae

Please sign in to comment.