-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstatistical_testing.py
243 lines (211 loc) · 8.63 KB
/
statistical_testing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
"""
Convenience functions for statistical testing.
"""
import numpy as np
def get_single_FT_surrogate(ts, seed=None):
"""
Returns single 1D Fourier transform surrogate.
Theiler, J., Eubank, S., Longtin, A., Galdrikian, B., & Farmer, J. D.
(1992). Testing for nonlinearity in time series: the method of
surrogate data. Physica D: Nonlinear Phenomena, 58(1-4), 77-94.
:param ts: timeseries to transform as [time x N]
:type ts: np.ndarray
:param seed: seed for random number generator
:type seed: int|None
:return: 1D FT surrogate of timeseries
:rtype: np.ndarray
"""
np.random.seed(seed)
if ts.ndim == 1:
ts = ts[:, np.newaxis]
xf = np.fft.rfft(ts, axis=0)
angle = np.random.uniform(0, 2 * np.pi, (xf.shape[0],))[:, np.newaxis]
# set the slowest frequency to zero, i.e. not to be randomised
angle[0] = 0
cxf = xf * np.exp(1j * angle)
return np.fft.irfft(cxf, n=ts.shape[0], axis=0).squeeze()
def get_single_time_shift_surrogate(ts, seed=None):
"""
Return single 1D time shift surrogate. Timeseries is shifted in time
(assumed periodic in the sense that is wrapped in the end) by a random
amount of time. Useful surrogate for testing phase relationships.
:param ts: timeseries to transform as [time x N]
:type ts: np.ndarray
:param seed: seed for random number generator
:type seed: int|None
:return: 1D time shift surrogate of timeseries
:rtype: np.ndarray
"""
np.random.seed(seed)
roll = np.random.choice(ts.shape[0], 1)[0]
# assert roll is not 0 - would give the same timeseries
while roll == 0:
roll = np.random.choice(ts.shape[0], 1)[0]
return np.roll(ts, roll, axis=0)
def get_single_shuffle_surrogate(ts, cut_points=None, repeats=100, seed=None):
"""
Return single 1D shuffle surrogate. Timeseries is cut into `cut_points`
pieces at random and then shuffled. If `cut_points` is None, will cut each
point, hence whole timeseries is shuffled. Typical usage:
- just 1 cut_point, 200 - 1000 repeats
- cut at several points, fewer repeats
:param ts: timeseries to transform as [time x N]
:type ts: np.ndarray
:param cut_points: number of cutting points, timeseries will be partitioned
into n+1 partitions with n cut_points; if None, each point is its own
partition, i.e. classical shuffle surrogate
:type cut_points: int|None
:param repeats: how many times is the shuffling procedure repeated
:type repeats: int
:param seed: seed for random number generator
:type seed: int|None
:return: 1D shuffle surrogate of timeseries
:rtype: np.ndarray
"""
np.random.seed(seed)
if cut_points is None:
cut_points = ts.shape[0]
assert (
cut_points <= ts.shape[0]
), "Cannot have more cut points than length of the timeseries"
def split_permute_concat(x, split_points):
"""
Helper that splits, permutes and concats the timeseries.
"""
splits = np.split(x, split_points, axis=0)
np.random.shuffle(splits)
return np.concatenate(splits, axis=0)
previous_permutation = ts.copy()
for _ in range(repeats):
# generate random partition points without replacement
partion_points = np.sort(
np.random.choice(ts.shape[0], cut_points, replace=False)
)
current_permutation = split_permute_concat(
previous_permutation, partion_points
)
# assert we actually permute the timeseries, useful when using only one
# cutting point, i.e. two partitions so they are forced to swap
while np.all(current_permutation == previous_permutation):
current_permutation = split_permute_concat(
previous_permutation, partion_points
)
previous_permutation = current_permutation.copy()
return current_permutation
def get_single_AAFT_surrogate(ts, seed=None):
"""
Returns single 1D amplitude-adjusted Fourier transform surrogate.
Schreiber, T., & Schmitz, A. (2000). Surrogate time series. Physica D:
Nonlinear Phenomena, 142(3-4), 346-382.
:param ts: timeseries to transform as [time x N]
:type ts: np.ndarray
:param seed: seed for random number generator
:type seed: int|None
:return: 1D AAFT surrogate of timeseries
:rtype: np.ndarray
"""
# create Gaussian data
if ts.ndim == 1:
ts = ts[:, np.newaxis]
gaussian = np.broadcast_to(
np.random.randn(ts.shape[0])[:, np.newaxis], ts.shape
)
gaussian = np.sort(gaussian, axis=0)
# rescale data to Gaussian distribution
ranks = ts.argsort(axis=0).argsort(axis=0)
rescaled_data = np.zeros_like(ts)
for i in range(ts.shape[1]):
rescaled_data[:, i] = gaussian[ranks[:, i], i]
# do phase randomization
phase_randomized_data = get_single_FT_surrogate(rescaled_data, seed=seed)
if phase_randomized_data.ndim == 1:
phase_randomized_data = phase_randomized_data[:, np.newaxis]
# rescale back to amplitude distribution of original data
sorted_original = ts.copy()
sorted_original.sort(axis=0)
ranks = phase_randomized_data.argsort(axis=0).argsort(axis=0)
for i in range(ts.shape[1]):
rescaled_data[:, i] = sorted_original[ranks[:, i], i]
return rescaled_data.squeeze()
def get_single_IAAFT_surrogate(ts, n_iterations=10, seed=None):
"""
Returns single 1D iteratively refined amplitude-adjusted Fourier transform
surrogate. A set of AAFT surrogates is iteratively refined to produce a
closer match of both amplitude distribution and power spectrum of surrogate
and original data.
Schreiber, T., & Schmitz, A. (2000). Surrogate time series. Physica D:
Nonlinear Phenomena, 142(3-4), 346-382.
:param ts: timeseries to transform as [time x N]
:type ts: np.ndarray
:param n_iterations: number of iterations of the procedure
:type n_iterations: int
:param seed: seed for random number generator
:type seed: int|None
:return: 1D IAAFT surrogate of timeseries
:rtype: np.ndarray
"""
if ts.ndim == 1:
ts = ts[:, np.newaxis]
# FT of original data
xf = np.fft.rfft(ts, axis=0)
# FT amplitudes
xf_amps = np.abs(xf)
sorted_original = ts.copy()
sorted_original.sort(axis=0)
# starting point of iterative procedure
R = get_single_AAFT_surrogate(ts, seed=seed)
if R.ndim == 1:
R = R[:, np.newaxis]
# iterate: `R` is the surrogate with "true" amplitudes and `s` is the
# surrogate with "true" spectrum
for _ in range(n_iterations):
# get Fourier phases of R surrogate
r_fft = np.fft.rfft(R, axis=0)
r_phases = r_fft / np.abs(r_fft)
# transform back, replacing the actual amplitudes by the desired
# ones, but keeping the phases exp(i*phase(i))
s = np.fft.irfft(xf_amps * r_phases, n=ts.shape[0], axis=0)
# rescale to desired amplitude distribution
ranks = s.argsort(axis=0).argsort(axis=0)
for j in range(R.shape[1]):
R[:, j] = sorted_original[ranks[:, j], j]
return R.squeeze()
def get_p_values(data_value, surrogates_value, tailed="upper"):
"""
Return one-tailed or two-tailed values of percentiles with respect to
surrogate testing. Two-tailed test assumes that the distribution of the
test statistic under H0 is symmetric about 0.
:param data_value: value(s) from data analyses
:type data_value: np.ndarray
:param surrogates_value: value(s) from surrogate data analyses, shape must
be [num_surrogates, ...] where (...) is the shape of the data
:type surrogates_value: np.ndarray
:param tailed: which test statistic to compute: `upper`, `lower`, or `both`
:type tailed: str
:return: p-value of surrogate testing
:rtype: float
"""
assert data_value.shape == surrogates_value.shape[1:], (
f"Incompatible shapes: data {data_value.shape}; surrogates "
f"{surrogates_value.shape}"
)
num_surrogates = surrogates_value.shape[0]
if tailed == "upper":
significance = 1.0 - np.sum(
np.greater_equal(data_value, surrogates_value), axis=0
) / float(num_surrogates)
elif tailed == "lower":
significance = np.sum(
np.less_equal(data_value, surrogates_value), axis=0
) / float(num_surrogates)
elif tailed == "both":
significance = 2 * (
1.0
- np.sum(
np.greater_equal(np.abs(data_value), surrogates_value), axis=0
)
/ float(num_surrogates)
)
else:
raise ValueError(f"Unknown tail for testing: {tailed}")
return significance