-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
158 lines (133 loc) · 5 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import logging
import json
from typing import List, Dict, Any
from config import (
playwright_cookies,
login_email,
login_password,
)
from url_fetcher import fetch_urls
from data_parser import extract_and_save_data
from playwright.sync_api import (
sync_playwright,
Page,
Browser,
BrowserContext,
TimeoutError,
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
def initialize_browser(playwright) -> tuple[Browser, BrowserContext]:
browser = playwright.chromium.launch(
headless=False, channel="chrome", args=["--start-maximized"]
)
context = browser.new_context()
context.add_cookies(playwright_cookies)
return browser, context
def login_required(page: Page) -> bool:
page.wait_for_load_state("networkidle")
try:
page.wait_for_selector("button:has-text('Anmelden')", timeout=2000)
except TimeoutError:
logger.warning("Login not required.")
return False
login_button = page.locator("button:has-text('Anmelden')")
if login_button.is_visible():
logger.info("Login button found. Clicking to proceed.")
login_button.scroll_into_view_if_needed()
login_button.click()
page.wait_for_load_state("networkidle")
try:
page.wait_for_selector("text=Anmelden/Registrieren", timeout=6000)
register_button = page.locator("text=Anmelden/Registrieren").nth(1)
if register_button.is_visible():
logger.info("Login required.")
register_button.click()
return True
except TimeoutError:
logger.warning("Login not required.")
return False
else:
logger.warning("Login not required.")
return False
def perform_login(page: Page) -> None:
logger.info("Performing login.")
page.wait_for_load_state("networkidle")
page.fill("#benutzername-input", login_email)
page.fill("#passwort-input", login_password)
page.click("text=Anmelden")
page.wait_for_load_state("networkidle")
def process_url(page: Page, url: str) -> Dict[str, Any]:
if login_required(page):
perform_login(page)
return
def capture_request_data(page: Page, url: str) -> Dict[str, Any]:
url_id = url.split("/")[-1]
logger.info(f"Processing URL ID: {url_id}")
data = {
"personal_details": None,
"general_details": None,
"response_text": [],
"id": url_id,
}
max_retries = 5
retries = 0
def handle_response(response):
nonlocal retries
if response.request.method == "GET" and any(
endpoint in response.url
for endpoint in ["/pc/v1/bewerberdetails/", "/pd/v1/kontaktdaten/"]
):
try:
json_response = response.json()
if json_response.get("anrede") and data["personal_details"] is None:
logger.info("Personal details found")
data["personal_details"] = json_response
elif json_response.get("refnr") and data["general_details"] is None:
logger.info("General details found")
data["general_details"] = json_response
except json.JSONDecodeError:
logger.warning(f"Non-JSON response received from {response.url}")
data["response_text"].append(response.text())
while retries < max_retries and (
data["personal_details"] is None or data["general_details"] is None
):
if retries > 0:
logger.info(f"Retrying request {retries}/{max_retries}")
page.wait_for_timeout(5000)
if retries > 1:
page.wait_for_timeout(10000)
page.on("response", handle_response)
page.goto(url)
if login_required(page):
perform_login(page)
page.wait_for_load_state("networkidle")
page.remove_listener("response", handle_response)
retries += 1
if data["personal_details"] is None:
logger.info("Personal details not found, retrying.")
if data["general_details"] is None:
logger.info("General details not found, retrying.")
return data
def get_request_data(urls: List[str]) -> List[Dict[str, Any]]:
logger.info(f"Starting data collection for {len(urls)} URLs.")
data_list = []
with sync_playwright() as p:
browser, context = initialize_browser(p)
try:
page = context.new_page()
for url in urls:
data = capture_request_data(page, url)
data_list.append(data)
logger.info(f"Number of URLs processed: {len(data_list)}")
finally:
browser.close()
logger.info("Browser closed.")
return data_list
if __name__ == "__main__":
urls = fetch_urls("input/refrence_numbers.txt")
collected_data = get_request_data(urls)
extract_and_save_data(collected_data, "output/data.json", "output/parsed_data.xlsx")