-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlookup_zones.py
148 lines (114 loc) · 5.53 KB
/
lookup_zones.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# our openprovider DNS migration script
# we're using ENV vars to login and generate a token
# after we have a token we retrieve records based on the a list of domains provided by the customer
# using postactivate from our virtualenv so to set some vars
import requests
import os
import json
import urllib3
# we don't care about cert errors for now, migration only
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class OpenProvider:
"""Create a connection to the OpenProvider API using the /v1beta endpoint.
Please provide valid username/password or use env var OPENPROVIDER_USERNAME and/or OPENPROVIDER_PASSWORD"""
def __init__(
self, username=None, password=None, url="https://api.openprovider.eu/v1beta"
):
"""Initializes the connection with the given username and password on the v1beta endpoint"""
if not password:
password = os.environ.get("OPENPROVIDER_PASSWORD")
if not username:
username = os.environ.get("OPENPROVIDER_USERNAME")
self.username = username
self.password = password
self.url = url
# Set up the API client using Session object so we can reuse certain values during request.
self.session = requests.Session()
# getting all kinds of SSL cert errors, skipping SSL check.
self.session.verify = False
self.session.headers["User-Agent"] = "openprovider.py/0.11.3"
self.session.headers["Content-Type"] = "application/json"
# during init fase we get token
self._get_token()
def _get_token(self):
""" get an access token using provided username and password"""
# define the URL for our login endpoint
# https://docs.openprovider.com/doc/all
token_url = f"{self.url}/auth/login"
# our body which we convert into json during our request
request_body = {}
request_body["username"] = self.username
request_body["password"] = self.password
result = self.session.post(token_url, json.dumps(request_body))
if result.ok:
# convert json to dict and assign token, will fail if no answer.
self.token = result.json()["data"]["token"]
# print(f"token: {self.token}")
else:
raise Exception("username or password wrong")
def get_zone(self, zone=None):
""" return all availble zones or records of an individual zone"""
# there is a limit on the numer of zones we can retrieve per call
# https://docs.openprovider.com/doc/all#tag/ZoneService
LIMIT = 500
records_retrieved = 0
all_zones = list()
all_records = list()
query_params = {"limit": LIMIT, "offset": 0}
# if zones var has a value lookup that specific zone
if zone:
url = f"{self.url}/dns/zones/{zone}/records"
else:
url = f"{self.url}/dns/zones/"
# we should have already have an access token use that as our Bearer token
if self.token:
self.session.headers["authorization"] = "Bearer " + self.token
else:
raise Exception("token missing")
# the data.total field of the response provides the number of available zones
# we're using a limit of LIMIT so we need to continue request zones until we reach data.total zones
# we expect at least 1 record
total_records = 1
while records_retrieved < total_records:
result = self.session.get(url, params=query_params)
body = result.json()
if result.ok:
# total records is the end of our list so len of all zones should be total zones in the dns provider
# both record types are using same structure so also showing total amount of results.
if total_records == 1:
total_records = int(body["data"]["total"])
if zone and total_records > 0:
# a zone has been defined, let's try to get some info but only if we have some result
# as we can get a 200 but with 0 records.
print(
f"looking up all records for zone: {zone} which has {total_records} records"
)
for record in body["data"]["results"]:
# just push DNS dict to all_records list
all_records.append(record)
records_retrieved = len(all_records)
elif total_records > 0:
# let's get all the zones
# using some list comprehension and building list of all zone names
res = [zone["name"] for zone in result.json()["data"]["results"]]
# now add list of zones to our all_zones list so we get one list of only the zone names
all_zones.extend(res)
# numer of records retrieved is also becoming our offset
records_retrieved = len(all_zones)
else:
# looks like we have nothing to return.
return all_zones
query_params["offset"] = records_retrieved
else:
raise Exception("something went wrong")
if zone:
# this will return a list with our record dicts
return all_records
else:
# this will return list with all zone names
return all_zones
def main():
op = OpenProvider()
print(op.get_zone())
if __name__ == "__main__":
main()