-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrbparser.py
186 lines (163 loc) · 8.95 KB
/
rbparser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
import time
import requests
import json
import radarboxurl
class aircraft_scrape:
def __init__(self):
radarbox_site = radarboxurl.rbsite()
self.baseurl = radarbox_site.baseurl
self.flighturl = radarbox_site.flighturl
def rbparse(self, tail_id):
url = self.baseurl + tail_id
r = requests.get(url)
try:
soup = BeautifulSoup(r.text, "html.parser")
script_tag = soup.find_all('script', type='module')
# Removing custom website function window.init() so it appears as a JSON body.
script_tag2 = script_tag[2].text.replace('window.init(', '').replace(')', '')
except IndexError:
try:
url = self.flighturl + tail_id #Check if input is flight number.
r = requests.get(url)
soup = BeautifulSoup(r.text, "html.parser")
script_tag = soup.find_all('script', type='module')
# Removing custom website function window.init() so it appears as a JSON body.
script_tag2 = script_tag[2].text.replace('window.init(', '').replace(')', '')
except Exception as e:
print(f'Status Code: {r.status_code}')
raise Exception('Request Failed: ', e)
# JSON parsing for the flights table.
json_parse = json.loads(script_tag2).get('current') # json body of the last completed or live flight.
return json_parse
#return true if aircraft does not fly for more than 2 days.
def is_grounded(self,tail_id):
global output_datetime
flight_info=self.rbparse(tail_id)
if flight_info.get('status') =='live':
return False
else:
utc_year = flight_info.get('year_utc')
utc_day = flight_info.get('day_utc')
utc_time = flight_info.get('arre_utc') or flight_info.get('arrival_utc') or flight_info.get('arrs_utc')
#parsing without time and year+day only if arrival time is missing. So script doesn't crash when formatting time.
if utc_time is None:
#Only date (UTC).
date_string = utc_year +" "+ utc_day
input_datetime=datetime.strptime(date_string,"%Y %d %b")
output_datetime=input_datetime.strftime("%d %b %Y")
#Else date+time
else:
#Date & Time (UTC).
date_string = utc_year +" "+ utc_day +" "+ utc_time
input_datetime=datetime.strptime(date_string,"%Y %d %b %H:%M")
output_datetime=input_datetime.strftime("%d %b %Y %I:%M %p")
if input_datetime < datetime.utcnow() - timedelta(days=2): # Change threshold from here if 2 days is not enough.
return True
else:
return False
def current_status(self,ac_tail):
flight_info=self.rbparse(ac_tail)
self.reg=flight_info.get('acr')
self.ac_type=flight_info.get('acd')
self.flight_no=flight_info.get('fnia')
self.airline=flight_info.get('alna')
self.status=flight_info.get('status')
self.dep_date=flight_info.get('depdate_utc')
self.est_deptime=flight_info.get('depe_utc')
self.act_deptime=flight_info.get('departure_utc')
self.org_airport=flight_info.get('aporgia')
self.org_city=flight_info.get('aporgci')
self.org_country=flight_info.get('aporgco')
self.arr_date=flight_info.get('arrdate_utc')
self.est_arrtime=flight_info.get('arrs_utc')
self.act_arrtime=flight_info.get('arrival_utc')
self.arr_airport=flight_info.get('apdstia')
self.arr_city=flight_info.get('apdstci')
self.arr_country=flight_info.get('apdstco')
self.future=flight_info.get('isFuture') # if true the flight_id is a future flight.
self.org_temp=flight_info.get('org_temp_c')
self.org_sky_cover=flight_info.get('org_sky_cover')
self.dst_temp=flight_info.get('dst_temp_c')
self.dst_sky_cover=flight_info.get('dst_sky_cover')
self.altitude=flight_info.get('alt')
self.sitrep=flight_info.get('statusLabel')['text']
self.distance=flight_info.get('distance')
self.progress=flight_info.get('progress')
self.mode_s=flight_info.get('ms')
return {
'reg': self.reg,
'ac_type': self.ac_type,
'flight_no': self.flight_no,
'airline': self.airline,
'status': self.status,
'dep_date': self.dep_date,
'est_deptime': self.est_deptime,
'act_deptime': self.act_deptime,
'org_airport': self.org_airport,
'org_city': self.org_city,
'org_country': self.org_country,
'arr_date': self.arr_date,
'est_arrtime': self.est_arrtime,
'act_arrtime': self.act_arrtime,
'arr_airport': self.arr_airport,
'arr_city': self.arr_city,
'arr_country': self.arr_country,
'future': self.future,
'org_temp': self.org_temp,
'org_sky_cover': self.org_sky_cover,
'dst_temp': self.dst_temp,
'dst_sky_cover': self.dst_sky_cover,
'altitude': self.altitude,
'distance': self.distance,
'mode_s': self.mode_s
}
def report_status(self, tail):
result = {} # Initialize an empty dictionary
if self.is_grounded(tail):
self.current_status(tail) # if you still want to access the current_status attributes
result['message'] = 'Aircraft is grounded. Last flight on: ' + str(output_datetime)
else:
self.current_status(tail)
if self.status == 'live':
result['message'] = f'{self.reg} is airborne. Flight No: {self.flight_no}'
result['ac_type'] = self.ac_type
result['flight_origin'] = self.org_airport + " " + self.org_city + "/" + self.org_country
result['flight_destination'] = self.arr_airport + " " + self.arr_city + "/" + self.arr_country
result['status'] = self.status
result['altitude'] = "-" if self.altitude is None else str(self.altitude) + ' feet'
result['sitrep'] = self.sitrep
result['estimated_takeoff'] = str(self.est_deptime) + " UTC " + str(self.dep_date)
result['actual_takeoff'] = str(self.act_deptime) + " UTC " + str(self.dep_date)
result['estimated_arrival'] = str(self.est_arrtime) + " UTC " + str(self.arr_date)
result['distance_nm'] = round(self.distance * 0.000539957, 0) if self.distance is not None else None
result['report_distance'] = (
str(round(result['distance_nm'] * self.progress / 100, 1)) + " miles flown. " +
str(round(result['distance_nm'] - (result['distance_nm'] * self.progress / 100), 1)) +
" miles remaining."
)
result['org_report_sky'] = self.org_sky_cover + " ", str(self.org_temp) + "°C"
result['dst_report_sky'] = self.dst_sky_cover + " ", str(self.dst_temp) + "°C"
elif self.status == 'landed':
result['message'] = f'{self.reg} is on the ground. Last completed flight as follows: '
result['ac_type'] = self.ac_type
result['flight_origin'] = str(self.org_airport) + " " + str(self.org_city) + "/" + str(self.org_country)
result['flight_destination'] = str(self.arr_airport) + " " + str(self.arr_city) + "/" + str(self.arr_country)
result['estimated_takeoff'] = str(self.est_deptime) + " UTC " + str(self.dep_date)
result['actual_takeoff'] = str(self.act_deptime) + " UTC " + str(self.dep_date)
result['estimated_arrival'] = str(self.est_arrtime) + " UTC " + str(self.arr_date)
result['actual_arrival'] = str(self.act_arrtime) + " UTC " + str(self.arr_date)
result['distance_nm'] = round(self.distance * 0.000539957, 0) if self.distance is not None else None
result['status'] = self.status
result['sitrep'] = self.sitrep
elif self.status == 'estimated':
result['message'] = f'Flight number {self.flight_no} is scheduled for {self.reg}'
result['ac_type'] = self.ac_type
result['flight_origin'] = str(self.org_airport) + " " + str(self.org_city) + "/" + str(self.org_country)
result['flight_destination'] = str(self.arr_airport) + " " + str(self.arr_city) + "/" + str(self.arr_country)
result['estimated_takeoff'] = str(self.est_deptime) + " UTC " + str(self.dep_date)
result['estimated_arrival'] = str(self.est_arrtime) + " UTC " + str(self.arr_date)
result['distance_nm'] = round(self.distance * 0.000539957, 0) if self.distance is not None else None
result['sitrep'] = self.sitrep
return result