-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsecondary.py
147 lines (120 loc) · 4.61 KB
/
secondary.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import os
import socket
import pickle
import threading
import time
import psutil
import stop
from config import Config
from constants import Constants
import worker
from utils import logger
from utils.com_socket import recv_msg
import sys
class SecondaryNode:
def __init__(self, argv):
self.sock = None
self.start_time = 0
self.dir_meta = ''
self.processes = dict()
self._stop_event = threading.Event()
self.failure_handler_thread = None
self.cpu_util = []
self.id = argv
def _connect_to_primary(self):
logger.info("Connecting to the primary node")
self.sock = socket.socket()
while True:
try:
self.sock.connect(Constants.SERVER_ADDRESS)
except OSError:
time.sleep(10)
logger.info("RETRYING")
continue
break
def _wait_for_start_command(self):
logger.info("Waiting for the start command from the primary node")
start_cmd = recv_msg(self.sock)
self.start_time, self.dir_meta = start_cmd
def _handle_deployments(self):
logger.info("Started deployment handler")
while True:
self.cpu_util.append((time.time(), psutil.cpu_percent()))
logger.debug(self.sock)
msg = recv_msg(self.sock)
logger.debug(msg)
if not msg:
self.should_stop = True
logger.debug(f"SECONDARY STOPPING: {self.id}")
break
else:
logger.debug(f"CREATE PROCESS fid={msg['fid']} time={time.time()}")
default_failure_timeout = None
if Config.SANITY_TEST >= 2:
default_failure_timeout = msg['fid'] * Config.STANDBY_TEST_CONFIG[2][1]
p = worker.WorkerProcess(start_time=self.start_time, dir_meta=self.dir_meta,
fail_timeout=default_failure_timeout,
**msg)
logger.debug(f"PROCESS_START fid={msg['fid']} time={time.time()}")
p.start()
self.processes[msg["fid"]] = p
def _handle_failures(self):
# to join the processes of failed FLSs
error_handling_socket = worker.WorkerSocket()
error_handling_socket.sock.settimeout(1)
process_counter = 0
while True:
if self._stop_event.is_set():
break
# logger.debug("STOPPING Processes")
try:
msg, _ = error_handling_socket.receive()
logger.debug(f"MSG: {msg}")
except socket.timeout:
# logger.debug(f"SECONDARY Timeout")
continue
if msg.fid in self.processes:
self.processes.pop(msg.fid).join()
process_counter += 1
logger.debug(f"PROCESS JOINT: {process_counter}")
def _start_failure_handler_thread(self):
self.failure_handler_thread = threading.Thread(target=self._handle_failures)
self.failure_handler_thread.start()
logger.info("Started failure handler")
def _stop_failure_handler_thread(self):
logger.debug(f"SHOULD STOP: TRUE")
self._stop_event.set()
self.failure_handler_thread.join()
def _ack_primary_node(self):
self.sock.sendall(pickle.dumps(True))
def _stop_processes(self):
logger.info(f"Stopping FLS processes {time.time()}")
stop.stop_all()
for p in self.processes.values():
if p.is_alive():
logger.debug(f"Process UNFINISHED: {p.name}")
p.join(Config.PROCESS_JOIN_TIMEOUT)
for p in self.processes.values():
if p.is_alive():
logger.debug(f"Process Alive: {p.name}")
p.terminate()
def _write_cpu_data(self, filename):
with open(filename, 'w') as file:
for data in self.cpu_util:
file.write(f"{data[0]},{data[1]}\n")
def start_node(self):
self._connect_to_primary()
self._wait_for_start_command()
self._start_failure_handler_thread()
self._handle_deployments()
def stop_node(self):
self._stop_failure_handler_thread()
self._stop_processes()
self._ack_primary_node()
if not os.path.exists(f"{self.dir_meta}/cpu_util/"):
os.makedirs(f"{self.dir_meta}/cpu_util/", exist_ok=True)
self._write_cpu_data(f"{self.dir_meta}/cpu_util/{self.id}_cpu_util.txt")
if __name__ == '__main__':
node = SecondaryNode(sys.argv[1:])
node.start_node()
node.stop_node()