-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbot.py
151 lines (129 loc) · 4.78 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os
import tweepy
import logging
import threading
from twitchAPI import Twitch, EventSub
from dotenv import load_dotenv
from functions.functionsBot import compareImages
from functions.twitchAPI import getStream, dateStream, getVideo
from asyncio import sleep
# load env variables
load_dotenv()
# env variables
WEBHOOK_URL = os.environ.get('WEBHOOK_URL')
APP_ID = os.environ.get('TWITCH_APP_ID')
APP_SECRET = os.environ.get('TWITCH_APP_SECRET')
ACCESS_TOKEN = os.environ.get('TWITTER_ACCESS_TOKEN')
ACCESS_SECRET = os.environ.get('TWITTER_ACCESS_SECRET')
CONSUMER_KEY = os.environ.get('TWITTER_CONSUMER_KEY')
CONSUMER_SECRET = os.environ.get('TWITTER_CONSUMER_SECRET')
PORT = os.environ.get('PORT', 8080)
TARGET_USERNAME = os.environ.get('TARGET_USERNAME')
LOGGING = os.environ.get('LOGGING')
if LOGGING == 'TRUE':
logging.basicConfig(level=logging.INFO)
# global variables
currentGame = None
currentTitle = None
streamId = None
online = False
gamesPlayed = list()
gamesBlacklist = ('Just Chatting', 'Watch Parties', 'Tabletop RPGs')
forever = threading.Event()
# login in twitch api
twitch = Twitch(APP_ID, APP_SECRET)
twitch.authenticate_app([])
# twitter authentication
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
api = tweepy.API(auth)
try:
api.verify_credentials()
print('Authentication Successful')
except:
print('Authentication Error')
# get the user_id from twitch user
uid = twitch.get_users(logins=[TARGET_USERNAME])
user_id = uid['data'][0]['id']
currentTitle = twitch.get_channel_information(user_id)['data'][0]['title']
# get the informations
try:
stream = twitch.get_streams(user_id=user_id)
currentGame = stream['data'][0]['game_name']
currentTitle = stream['data'][0]['title']
streamId = stream['data'][0]['id']
if currentGame not in gamesBlacklist:
gamesPlayed.append(currentGame)
online = True
except:
online = False
# functions callbacks
async def stream_online(data: dict):
global online, currentGame, streamId
stream = twitch.get_streams(user_id=user_id)
title = stream['data'][0]['title']
currentGame = stream['data'][0]['game_name']
streamId = stream['data'][0]['id']
api.update_status(f'Cellbit entrou ao vivo!\n\nTítulo: {title}\ntwitch.tv/cellbit')
online = True
async def stream_offline(data: dict):
global online, gamesPlayed
api.update_status('Cellbit encerrou a live!')
online = False
if len(gamesPlayed) > 0:
tweetId = api.user_timeline(screen_name='livesdocellbit')[0].id
date = dateStream()
status = f"[{date['day']}/{date['month']}/{date['year']}] Games Jogados:\n\n"
for game in gamesPlayed:
status += f'• {game}\n'
if getVideo['title'] != currentTitle:
status += f'\nVOD: https://twitchtracker.com/cellbit/streams/{streamId}'
else:
status += f'\nVOD: {getVideo()["link"]}'
api.update_status(status, in_reply_to_status_id = tweetId)
gamesPlayed = list()
async def channel_update(data: dict):
global currentGame, currentTitle, gamesPlayed
game = data['event']['category_name']
title = data['event']['title']
if game != currentGame and online == True:
# stream = twitch.get_streams(user_id=user_id)
timeVod = getStream()
h, m, s = timeVod['vodHours'], timeVod['vodMinutes'], timeVod['vodSeconds']
try:
import urllib.request
imageUrl = twitch.get_games(names=game)['data'][0]['box_art_url'].replace('{width}', '600').replace('{height}', '800')
urllib.request.urlretrieve(imageUrl, 'gameImg.jpg')
status = f'Cellbit está jogando: {game}\nTempo no VOD: {h}h{m}m{s}s\n\ntwitch.tv/cellbit'
if compareImages() or game == 'Just Chatting':
api.update_status(status)
else:
api.update_status_with_media(status, 'gameImg.jpg')
except:
api.update_status(status)
finally:
if game not in gamesPlayed and game not in gamesBlacklist:
gamesPlayed.append(game)
currentGame = game
# sleep(1)
# infoVideo = getVideo()
if title != currentTitle and online == False:
api.update_status(f'[TÍTULO] {title}')
currentTitle = title
# subscribe to EventSub
hook = EventSub(WEBHOOK_URL, APP_ID, PORT, twitch)
hook.unsubscribe_all()
hook.start()
print('Iniciando webhooks...\n')
hook.listen_channel_update(user_id, channel_update)
print('[OK] CHANNEL UPDATE - WEBHOOK')
hook.listen_stream_offline(user_id, stream_offline)
print('[OK] STREAM OFFLINE - WEBHOOK')
hook.listen_stream_online(user_id, stream_online)
print('[OK] STREAM ONLINE - WEBHOOK')
print('\n')
try:
print('Rodando!')
forever.wait()
finally:
hook.stop()