-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtestnet.py
214 lines (177 loc) · 7.38 KB
/
testnet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""
Bring up a rippled testnetwork from a set of config files with fixed ips.
"""
from contextlib import contextmanager
import glob
import os
import subprocess
import time
from typing import Callable, List, Optional, Set, Union
from command import ServerInfo
from config_file import ConfigFile
from ripple_client import RippleClient
class Network:
# If run_server is None, run all the servers.
# This is useful to help debugging
def __init__(
self,
exe: str,
configs: List[ConfigFile],
*,
command_logs: Optional[List[str]] = None,
run_server: Optional[List[bool]] = None,
# undocumented feature. If with_rr is not None, assume it points to the rr debugger executable
# and run server 0 under rr
with_rr: Optional[str] = None,
extra_args: Optional[List[List[str]]] = None,
):
self.with_rr = with_rr
if not configs:
raise ValueError(f"Must specify at least one config")
if run_server and len(run_server) != len(configs):
raise ValueError(
f"run_server length must match number of configs (or be None): {len(configs) = } {len(run_server) = }"
)
self.configs = configs
self.clients = []
self.running_server_indexes = set()
self.processes = {}
if not run_server:
run_server = []
run_server += [True] * (len(configs) - len(run_server))
self.run_server = run_server
if not command_logs:
command_logs = []
command_logs += [None] * (len(configs) - len(command_logs))
self.command_logs = command_logs
# remove the old database directories.
# we want tests to start from the same empty state every time
for config in self.configs:
db_path = config.database_path.get_line()
if db_path and os.path.isdir(db_path):
files = glob.glob(f"{db_path}/**", recursive=True)
for f in files:
if os.path.isdir(f):
continue
os.unlink(f)
for config, log in zip(self.configs, self.command_logs):
client = RippleClient(config=config, command_log=log, exe=exe)
self.clients.append(client)
self.servers_start(extra_args=extra_args)
def shutdown(self):
for a in self.clients:
a.shutdown()
self.servers_stop()
def num_clients(self) -> int:
return len(self.clients)
def get_client(self, i: int) -> RippleClient:
return self.clients[i]
def get_configs(self) -> List[ConfigFile]:
return [c.config for c in self.clients]
def get_pids(self) -> List[int]:
return [c.get_pid() for c in self.clients if c.get_pid() is not None]
# Get a dict of the server_state, validated_ledger_seq, and complete_ledgers
def get_brief_server_info(self) -> dict:
ret = {"server_state": [], "ledger_seq": [], "complete_ledgers": []}
for c in self.clients:
r = c.get_brief_server_info()
for (k, v) in r.items():
ret[k].append(v)
return ret
# returns true if the server is running, false if not. Note, this relies on
# servers being shut down through the `servers_stop` interface. If a server
# crashes, or is started or stopped through other means, an incorrect status
# may be reported.
def get_running_status(self) -> List[bool]:
return [i in self.running_server_indexes for i in range(len(self.clients))]
def is_running(self, index: int) -> bool:
return index in self.running_server_indexes
def wait_for_validated_ledger(self, server_index: Optional[int] = None):
"""
Don't return until the network has at least one validated ledger
"""
if server_index is None:
for i in range(len(self.configs)):
self.wait_for_validated_ledger(i)
return
client = self.clients[server_index]
for i in range(600):
r = client.send_command(ServerInfo())
state = None
if "info" in r:
state = r["info"]["server_state"]
if state == "proposing":
print(f"Synced: {server_index} : {state}", flush=True)
break
if not i % 10:
print(f"Waiting for sync: {server_index} : {state}", flush=True)
time.sleep(1)
for i in range(600):
r = client.send_command(ServerInfo())
state = None
if "info" in r:
complete_ledgers = r["info"]["complete_ledgers"]
if complete_ledgers and complete_ledgers != "empty":
print(
f"Have complete ledgers: {server_index} : {state}", flush=True
)
return
if not i % 10:
print(
f"Waiting for complete_ledgers: {server_index} : {complete_ledgers}",
flush=True,
)
time.sleep(1)
raise ValueError("Could not sync server {client.config_file_name}")
def servers_start(
self,
server_indexes: Optional[Union[Set[int], List[int]]] = None,
*,
extra_args: Optional[List[List[str]]] = None,
):
if server_indexes is None:
server_indexes = [i for i in range(len(self.clients))]
if extra_args is None:
extra_args = []
extra_args += [list()] * (len(self.configs) - len(extra_args))
for i in server_indexes:
if i in self.running_server_indexes or not self.run_server[i]:
continue
client = self.clients[i]
to_run = [client.exe, "--conf", client.config_file_name]
if self.with_rr and i == 0:
to_run = [self.with_rr, "record"] + to_run
print(f"Starting server with rr {client.config_file_name}")
else:
print(f"Starting server {client.config_file_name}")
fout = open(os.devnull, "w")
p = subprocess.Popen(
to_run + extra_args[i], stdout=fout, stderr=subprocess.STDOUT
)
client.set_pid(p.pid)
print(
f"started rippled: config: {client.config_file_name} PID: {p.pid}",
flush=True,
)
self.running_server_indexes.add(i)
self.processes[i] = p
time.sleep(2) # give servers time to start
def servers_stop(self, server_indexes: Optional[Union[Set[int], List[int]]] = None):
if server_indexes is None:
server_indexes = self.running_server_indexes.copy()
if 0 in server_indexes:
print(
f"WARNING: Server 0 is being stopped. RPC commands cannot be sent until this is restarted."
)
for i in server_indexes:
if i not in self.running_server_indexes:
continue
client = self.clients[i]
to_run = [client.exe, "--conf", client.config_file_name]
fout = open(os.devnull, "w")
subprocess.Popen(to_run + ["stop"], stdout=fout, stderr=subprocess.STDOUT)
self.running_server_indexes.discard(i)
for i in server_indexes:
self.processes[i].wait()
del self.processes[i]
self.get_client(i).set_pid(-1)