forked from danihodovic/celery-exporter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconftest.py
59 lines (43 loc) · 1.29 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import socket
import pytest
from src.exporter import Exporter
@pytest.fixture(scope="session")
def celery_config():
return {
"broker_url": "memory://localhost/",
"worker_send_task_events": True,
"task_send_sent_event": True,
}
# https://github.com/celery/celery/pull/6632
@pytest.fixture(scope="session")
def celery_worker_parameters():
return {"without_heartbeat": False}
@pytest.fixture(scope="session")
def find_free_port():
"""
https://gist.github.com/bertjwregeer/0be94ced48383a42e70c3d9fff1f4ad0
"""
def _find_free_port():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("0.0.0.0", 0))
portnum = s.getsockname()[1]
s.close()
return portnum
return _find_free_port
@pytest.fixture()
def exporter(find_free_port, celery_config):
cfg = {
"port": find_free_port(),
"broker_url": celery_config["broker_url"],
"broker_transport_option": ["visibility_timeout=7200"],
"broker_ssl_option": [],
"retry_interval": 5,
"log_level": "DEBUG",
}
exporter = Exporter()
setattr(exporter, "cfg", cfg)
yield exporter
@pytest.fixture()
def hostname():
return socket.gethostname()