This repository has been archived by the owner on Nov 19, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
131 lines (98 loc) · 4.05 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import requests
import os
import sys
import json
import time
import ctypes
from secrets import bearer_token
### Parameters ###
testing = False
user_id = 3466569498 # Twitter UserID of the person you're listening to
wait_time = 3600 # 1 Hour
failLimit = 10 # How many retries to allow before exitting process
#################
def bearer_oauth(r):
"""
Method required by bearer token authentication.
"""
r.headers["Authorization"] = f"Bearer {bearer_token}"
r.headers["User-Agent"] = "v2UserTweetsPython"
return r
def connect_to_endpoint(url, params):
response = requests.request("GET", url, auth=bearer_oauth, params=params)
if response.status_code != 200:
raise Exception(
"Request returned an error: {} {}".format(
response.status_code, response.text
)
)
return response.json()
def get_testing_data(type):
if type == "new":
path = "/testing_data/new.json"
else:
path = "/testing_data/old.json"
with open(os.path.dirname(__file__) + path) as jsonFile:
jsonObject = json.load(jsonFile)
jsonFile.close()
return jsonObject
def download_wallpaper(url):
with open(os.path.dirname(__file__) + "/" + "wallpaper.jpg", 'wb') as handle:
response = requests.get(url, stream=True)
if response.status_code != 200:
raise Exception(
"Request returned an error: {} {}".format(
response.status_code, response.text
)
)
for block in response.iter_content(1024):
if not block:
break
handle.write(block)
def set_wallpaper():
ctypes.windll.user32.SystemParametersInfoW(20, 0, os.path.dirname(__file__) + "/" + "wallpaper.jpg", 0)
def main():
url = "https://api.twitter.com/2/users/{}/tweets".format(user_id)
params = {"tweet.fields": "created_at,attachments", "media.fields": "url", "max_results": 5, "exclude": "retweets,replies", "expansions": "attachments.media_keys"}
connected = False
failCounter = 0
while True:
connected = False
if testing:
json_response = get_testing_data("new")
else:
# Helps prevent crash on startup, since it may take a while for the computer to connect to the internet
while connected == False:
try:
json_response = connect_to_endpoint(url, params)
connected = True
except requests.ConnectionError:
failCounter += 1
print("Couldn't connect, possibly no internet connection available")
if failCounter >= failLimit:
print("Too many retries, exitting process...")
sys.exit()
else:
print("Retrying in 5 seconds...")
time.sleep(5)
with open(os.path.dirname(__file__) + "/" + "current_wallpaper.txt", "r+") as current_wallpaper_file:
current_wallpaper = current_wallpaper_file.read()
print("Current wallpaper url: " + current_wallpaper)
newest_tweeted_wallpaper = json_response["includes"]["media"][0]["url"]
if current_wallpaper == newest_tweeted_wallpaper:
print("Newest wallpaper matches the current one")
else:
print("New wallpaper tweeted with url: " + newest_tweeted_wallpaper)
print("Downloading wallpaper...")
download_wallpaper(newest_tweeted_wallpaper)
print("Setting wallpaper...")
set_wallpaper()
print("Updating record...")
current_wallpaper_file.seek(0)
current_wallpaper_file.write(newest_tweeted_wallpaper)
current_wallpaper_file.truncate()
print(f"Sleeping... (for {wait_time}s)")
time.sleep(wait_time)
#print(json.dumps(json_response, indent=4, sort_keys=True))
if __name__ == "__main__":
main()