-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmock_server.py
76 lines (58 loc) · 1.88 KB
/
mock_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import zmq
from tradex.config import MOCK_SUB_PORT, MOCK_ROUTER_PORT
import time
from tradex.market.clear_database import main as mo
from tradex.market.fetch_history_hst import parse_hst
from tradex.tests.mocked_classes import MockedMarketPair
from datetime import datetime, timedelta
from threading import Thread
import pandas as pd
from threading import Thread
import logging
def pub_strat(port=45600):
context = zmq.Context()
publish = context.socket(zmq.PUB)
publish.bind(f'tcp://*:{port}')
now = datetime.now()
term = now + timedelta(minutes=1, seconds=10)
while datetime.now() < term:
try:
time.sleep(10)
publish.send_string("EURUSD")
print('Sent.....')
except KeyboardInterrupt:
publish.close()
context.term()
publish.send_string("EURUSD kill")
publish.close()
context.term()
def publish(port=MOCK_SUB_PORT):
context = zmq.Context()
publish = context.socket(zmq.PUB)
publish.bind(f'tcp://*:{port}')
now = datetime.now()
term = now + timedelta(minutes=1, seconds=10)
while datetime.now() < term:
try:
time_int = pd.Timestamp.now('UTC').timestamp()
publish.send_string(
f'EURUSD 1.9023;1.9879;{time_int}'
)
print('Moving on.....')
time.sleep(30)
except KeyboardInterrupt:
publish.close()
context.term()
publish.send_string('EURUSD kill')
publish.close()
context.term()
def req_response():
ctx = zmq.Context()
sock = ctx.socket(zmq.ROUTER)
sock.bind(f'tcp://*:{MOCK_ROUTER_PORT}')
msg = sock.recv_multipart()
sock.send_multipart([msg[0], b"", b"ok"])
logging.info("Successfully sent message to receiver socket....")
sock.close()
ctx.term()
logging.info("Breaking out of function....all successful....")