-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient_sampler.py
202 lines (142 loc) · 5.18 KB
/
client_sampler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import time
import numpy as np
from abc import ABC, abstractmethod
class ClientsSampler(ABC):
r"""Base class for clients sampler
Attributes
----------
clients_weights_dict: Dict[int: float]
maps clients ids to their corresponding weight/importance in the objective function
participation_dict: Dict[int: float]
maps clients ids to their corresponding participation probabilities
activity_simulator: ActivitySimulator
activity_estimator: ActivityEstimator
unknown_participation_probs: Bool
if True, participation probabilities are estimated through ActivityEstimator
_time_step: int
tracks the number of steps
Methods
----------
__init__
sample_clients
step
"""
def __init__(
self,
clients,
participation_probs,
activity_simulator,
activity_estimator,
unknown_participation_probs,
*args,
**kwargs
):
"""
Parameters
----------
activity_simulator: ActivitySimulator
activity_estimator: ActivityEstimator
clients_weights_dict: Dict[int: float]
"""
n_clients = len(clients)
self.clients_weights_dict = self.get_client_weights_dict(clients)
self.participation_dict = self.get_participation_dict(n_clients, participation_probs)
self.activity_simulator = activity_simulator
self.activity_estimator = activity_estimator
self.unknown_participation_probs = unknown_participation_probs
if self.unknown_participation_probs:
print("Estimate participation probabilities")
self._time_step = -1
@staticmethod
def get_client_weights_dict(clients):
"""compute client weights as a proportion of training samples
Parameters
----------
clients : list
Returns
-------
dict : key is client_id and value is client_weight.
"""
clients_weights = np.array([client.n_train_samples for client in clients])
clients_weights = clients_weights / clients_weights.sum()
return {client.id: weight for client, weight in zip(clients, clients_weights)}
@staticmethod
def get_participation_dict(n_clients, participation_probs):
"""return a dictionary mapping client_id to participation_prob
Parameters
----------
n_clients : int
participation_probs : list
Returns
-------
dict : key is client_id and value is participation_prob
"""
client_probs = np.tile(participation_probs, n_clients // len(participation_probs))
return dict(enumerate(client_probs))
def get_active_clients(self, c_round):
"""receive the list of active clients
Parameters
----------
c_round:
Returns
-------
* List[int]
"""
return self.activity_simulator.get_active_clients(c_round)
def estimate_participation_probs(self, c_round):
"""receive the list of estimated client participations from the ActivityEstimator
Parameters
----------
c_round: int
Returns
-------
* Dict[int]: a dictionary with client_id as keys and estimated participation probabilities as values
"""
return self.activity_estimator.estimate_participation_probs(c_round)
def step(self):
"""update the internal step of the clients sampler
Parameters
----------
Returns
-------
None
"""
self._time_step += 1
@abstractmethod
def sample(self, c_round):
"""sample clients
Parameters
----------
c_round: int
Returns
-------
* List[int]: indices of the sampled clients_dict
* List[float]: weights to be associated to the sampled clients_dict
"""
pass
class UnbiasedClientsSampler(ClientsSampler):
"""
Samples all active clients with aggregation weight inversely proportional to their participation prob
"""
def sample(self, c_round):
"""implementation of the abstract method ClientSampler.sample for the UnbiasedClientSampler
Samples all active clients with aggregation weight inversely proportional to their participation probability.
Parameters
----------
c_round: int
Returns
-------
* List[int]: indices of the sampled clients_dict
* List[float]: weights to be associated to the sampled clients_dict
"""
sampled_clients_ids, sampled_clients_weights = [], []
active_clients = self.get_active_clients(c_round)
if self.unknown_participation_probs:
participation_probs = self.estimate_participation_probs(c_round)
else:
participation_probs = self.participation_dict
for client_id in active_clients:
sampled_clients_ids.append(client_id)
sampled_clients_weights.append(self.clients_weights_dict[client_id] / participation_probs[client_id])
self.step()
return sampled_clients_ids, sampled_clients_weights