-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathmain.py
122 lines (83 loc) · 3.7 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import logging
import pathlib
from typing import List
import requests
import cloudflare
class App:
def __init__(self, adlist_name: str, adlist_url: str):
self.adlist_name = adlist_name
self.adlist_url = adlist_url
self.name_prefix = f"[AdBlock-{adlist_name}]"
self.logger = logging.getLogger("main")
def run(self):
self.download_file()
domains = self.convert_to_domain_list(self.adlist_name)
# check if the list is already in Cloudflare
cf_lists = cloudflare.get_lists(self.name_prefix)
self.logger.info(f"Number of lists in Cloudflare: {len(cf_lists)}")
# compare the lists size
if len(domains) == sum([l["count"] for l in cf_lists]):
self.logger.warning("Lists are the same size, skipping")
else:
# delete the lists
for l in cf_lists:
self.logger.info(f"Deleting list {l['name']}")
cloudflare.delete_list(l["id"])
cf_lists = []
# chunk the domains into lists of 1000 and create them
for chunk in self.chunk_list(domains, 1000):
list_name = f"{self.name_prefix} {len(cf_lists) + 1}"
self.logger.info(f"Creating list {list_name}")
_list = cloudflare.create_list(list_name, chunk)
cf_lists.append(_list)
# get the gateway policies
cf_policies = cloudflare.get_firewall_policies(self.name_prefix)
self.logger.info(f"Number of policies in Cloudflare: {len(cf_policies)}")
# setup the gateway policy
if len(cf_policies) == 0:
self.logger.info("Creating firewall policy")
cf_policies = cloudflare.create_gateway_policy(f"{self.name_prefix} Block Ads", [l["id"] for l in cf_lists])
elif len(cf_policies) != 1:
self.logger.error("More than one firewall policy found")
raise Exception("More than one firewall policy found")
else:
self.logger.info("Updating firewall policy")
cloudflare.update_gateway_policy(f"{self.name_prefix} Block Ads", cf_policies[0]["id"], [l["id"] for l in cf_lists])
self.logger.info("Done")
def download_file(self):
self.logger.info(f"Downloading file from {self.adlist_url}")
r = requests.get(self.adlist_url, allow_redirects=True)
path = pathlib.Path(self.adlist_name)
open(path, "wb").write(r.content)
self.logger.info(f"File size: {path.stat().st_size}")
def convert_to_domain_list(self, file_name: str):
with open(file_name, "r") as f:
data = f.read()
# check if the file is a hosts file or a list of domain
is_hosts_file = False
for ip in ["localhost", "127.0.0.1", "::1", "0.0.0.0"]:
if ip in data:
is_hosts_file = True
break
domains = []
for line in data.splitlines():
# skip comments and empty lines
if line.startswith("#") or line == "\n" or line == "":
continue
if is_hosts_file:
# remove the ip address and the trailing newline
domain = line.split()[1].rstrip()
# skip the localhost entry
if domain == "localhost":
continue
else:
domain = line.rstrip()
domains.append(domain)
self.logger.info(f"Number of domains: {len(domains)}")
return domains
def chunk_list(self, _list: List[str], n: int):
for i in range(0, len(_list), n):
yield _list[i : i + n]
if __name__ == "__main__":
app = App("Adaway", "https://adaway.org/hosts.txt")
app.run()