-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtweetify.py
209 lines (185 loc) · 7.48 KB
/
tweetify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Aug 11 18:35:17 2022
@author: efearikan
"""
from random import choice, seed
import os
import time
import pickle
import schedule
import spotipy
from spotipy.oauth2 import SpotifyOAuth
import tweepy
import track as tr
import message
class Tweetify():
def __init__(self):
# Spotipy initialization
self.scope = "playlist-read-private, user-library-read, \
user-read-currently-playing, user-read-playback-state, \
user-modify-playback-state"
self.init_spotipy()
# Tweepy initialization
consumer_key = os.environ.get("CONSUMER_KEY")
consumer_secret = os.environ.get("CONSUMER_SECRET")
access_token = os.environ.get("ACCESS_TOKEN")
access_token_secret = os.environ.get("ACCESS_TOKEN_SECRET")
self.client = tweepy.Client(
consumer_key=consumer_key, consumer_secret=consumer_secret,
access_token=access_token, access_token_secret=access_token_secret,
wait_on_rate_limit=True
)
self.twitter_id = self.client.get_me()[0]["id"]
self.last_mention_id = 1
self.tracks = []
seed(None)
schedule.every(60).seconds.do(self.refresh_spotify)
schedule.every(60).seconds.do(self.respond_to_mention)
schedule.every().day.at("10:00").do(self.suggest_music)
def init_spotipy(self):
self.auth_manager = SpotifyOAuth(
scope=self.scope)
self.token = self.auth_manager.get_cached_token()
self.sp = spotipy.Spotify(
auth=self.token['access_token'], requests_timeout=10, retries=10)
self.user_id = self.sp.me()['id']
def refresh_spotify(self):
if self.auth_manager.is_token_expired(self.token):
print("Acces token expired. Refreshing...")
self.token = self.auth_manager.get_cached_token()
self.sp = spotipy.Spotify(auth=self.token['access_token'])
else:
print("Access token lookin good")
def run(self):
print("Starting the bot")
while True:
try:
schedule.run_pending()
time.sleep(1)
except:
pass
def get_user_playlists(self):
playlists = self.sp.current_user_playlists()
res = []
for playlist in playlists['items']:
if playlist['owner']['id'] != self.user_id:
continue
res.append(playlist)
# for item in self.sp.current_user_saved_tracks()['items']:
# track = item['track']
# artist = track['artists'][0]
# name = track['name']
# url = track["external_urls"]["spotify"]
# # img_url = track['album']['images'][0]['url']
# track = Track(artist, name, url, "Liked Songs", "-")
# self.tracks.append(track)
return res
def get_tracks(self, playlists):
for playlist in playlists:
results = self.sp.playlist(playlist['id'], fields="tracks,next")
tracks = results['tracks']
self.iterate_tracks(
tracks, playlist["name"], playlist["external_urls"])
while tracks['next']:
tracks = self.sp.next(tracks)
self.iterate_tracks(
tracks, playlist["name"], playlist["external_urls"])
def iterate_tracks(self, results, playlist, playlist_link):
for i, item in enumerate(results['items']):
track = item['track']
if track["is_local"]:
continue
artist = track['artists'][0]
name = track['name']
url = track["external_urls"]["spotify"]
# img_url = track['album']['images'][0]['url']
track = tr.Track(artist, name, url,
playlist, playlist_link)
self.tracks.append(track)
def get_random_track(self):
self.get_tracks(self.get_user_playlists())
return choice(self.tracks)
def basic_tweet(self, msg, mention_id=None):
self.client.create_tweet(
text=msg, in_reply_to_tweet_id=(mention_id)
)
def respond_to_mention(self):
mentions = self.client.get_users_mentions(
self.twitter_id, user_auth=True, since_id=self.last_mention_id,
expansions='author_id')
# Loop
if mentions[0] is not None:
for user in mentions[1]["users"]:
self.is_following(user)
for mention in mentions[0]:
mention_id = mention.id
# Like the tweet first!
self.client.like(mention_id, user_auth=True)
mention = str(mention)
print("Responding to message")
if "!music" in mention:
self.suggest_music(mention_id)
elif "!queue" in mention:
mention = mention.replace(
"@Tweetify_Bot", '').replace("!queue", '')
self.add_to_queue(mention_id, mention)
elif "good bot" in mention or "Good bot" in mention:
msg = message.AprrMessage().get
self.basic_tweet(msg, mention_id)
else:
msg = message.HelpMessage().get
self.basic_tweet(msg, mention_id)
self.last_mention_id = mentions.meta["newest_id"]
with open('dump.pickle', 'wb') as file:
pickle.dump(self.last_mention_id, file)
else:
print("Sleeping")
def suggest_music(self, mention_id=None):
track = self.get_random_track()
msg = message.SuggestionMessage()
msg.format_text(track.name, track.artist,
track.playlist,
track.playlist_link,
track.url)
self.client.create_tweet(
text=msg.get, in_reply_to_tweet_id=(mention_id)
)
def help_tweet(self, mention_id):
msg = message.HelpMessage()
self.client.create_tweet(
text=msg.get,
in_reply_to_tweet_id=(mention_id)
)
def add_to_queue(self, mention_id, mention):
if self.sp.current_user_playing_track() is not None:
res = self.sp.search(mention)
res = res["tracks"]["items"][0]
artist = res["artists"][0]
song = res["name"]
url = res["external_urls"]["spotify"]
track = tr.Track(artist, song, url)
msg = message.QueueMessage(True)
msg.format_text(track.name, track.artist, track.url)
self.client.create_tweet(
text=msg.get, in_reply_to_tweet_id=(mention_id)
)
self.sp.add_to_queue(url)
else:
msg = message.QueueMessage(False).get
self.client.create_tweet(
text=msg,
in_reply_to_tweet_id=(mention_id)
)
def follow_followers(self):
followers = self.client.get_users_followers(
self.twitter_id, user_auth=True)
for follower in followers[0]:
self.client.follow_user(follower.id, user_auth=True)
def is_following(self, username):
followings = self.client.get_users_following(
self.twitter_id, user_auth=True)
if username not in [following.username for following in followings[0]]:
_id = self.client.get_user(username=username, user_auth=True)[0].id
self.client.follow_user(_id, user_auth=True)