-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbot.py
430 lines (315 loc) · 12.2 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
import os
from bs4 import BeautifulSoup
import requests
import json
import logging
from pathlib import Path
import random
from dotenv import load_dotenv
from telegram import LabeledPrice, Update
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
MessageHandler,
PreCheckoutQueryHandler,
filters,
)
from whoosh.index import open_dir
from whoosh.qparser import QueryParser
from whoosh.query import Query
from whoosh.searching import Searcher
load_dotenv()
BOT_TOKEN = os.getenv("BOT_TOKEN")
PAYMENT_TOKEN = os.getenv("PAYMENT_TOKEN")
BLOG_NAME = os.getenv("BLOG_NAME")
BLOG_FEED = os.getenv("BLOG_FEED")
ADMIN_ID = int(os.getenv("ADMIN_ID"))
index = open_dir("data")
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
def load_config():
config_path = Path(__file__).parent / "config.json"
if not config_path.exists():
return {}
with open(config_path) as fp:
return json.load(fp)
def update_config(**kwargs):
config = load_config()
config.update(**kwargs)
config_path = Path(__file__).parent / "config.json"
with open(config_path, "w") as fp:
return json.dump(config, fp, indent=4)
def add_notify(user_id):
notifications = set(load_config().get("notifications", []))
notifications.add(user_id)
update_config(notifications=list(notifications))
def remove_notify(user_id):
notifications = set(load_config().get("notifications", []))
notifications.remove(user_id)
update_config(notifications=list(notifications))
## BOT METHODS
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
_register_user(update)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=f"""
Welcome to {BLOG_NAME}. I can help you search for
posts and read them right here in Telegram, as well as unlock
premium posts.
Send /help for detailed instructions.""",
)
if context.args:
if context.args[0].startswith("donate_"):
amount = int(context.args[0].split("_")[1])
await _donate(update, context, amount)
else:
await unlock_post(update, context)
def _register_user(update):
users = load_config().get("users", [])
users.append(update.effective_user.id)
update_config(users=list(set(users)))
async def help(update: Update, context: ContextTypes.DEFAULT_TYPE):
_register_user(update)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=f"""
Welcome to {BLOG_NAME}.
I can help you search for posts and read them right here in Telegram, as well as unlock premium posts.
- Send /latest to see the last 10 posts published.
- Send /notify to receive a notification here every time a new article is posted (/mute to cancel).
- Send /search followed by some text to find for relevant posts related to that text. This is a simple text-based search.
- Send /unlock to see a list of premium posts that you can buy individually.
- Send /donate to make a donation. You can add any desired amount (default $1).
""",
)
async def notify(update: Update, context: ContextTypes.DEFAULT_TYPE):
_register_user(update)
user_id = update.effective_user.id
add_notify(user_id)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text="Notifications are turned on. Send /mute to turn off.",
)
async def mute(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
remove_notify(user_id)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text="Notifications are turned off. Send /notify to turn on.",
)
## SEARCH
async def search(update: Update, context: ContextTypes.DEFAULT_TYPE):
_register_user(update)
query = QueryParser("content", index.schema).parse(" ".join(context.args))
response = []
with index.searcher() as s:
results = s.search(query, limit=10)
for r in results:
response.append(f"**[{r['title']}]({r['path']}):** {r['subtitle']}")
if not response:
await context.bot.send_message(
update.effective_chat.id,
text="No relevant articles found.",
)
return
await context.bot.send_message(
update.effective_chat.id,
text="\n\n".join(response),
parse_mode="markdown",
disable_web_page_preview=True,
)
async def latest(update: Update, context: ContextTypes.DEFAULT_TYPE):
_register_user(update)
response = []
with open("data/items.json") as fp:
items = json.load(fp).values()
items = sorted(items, key=lambda i: i["date"], reverse=True)
for r in items[:10]:
response.append(f"**[{r['title']}]({r['path']}):** {r['subtitle']}")
if not response:
await context.bot.send_message(
update.effective_chat.id,
text="No articles found.",
)
return
await context.bot.send_message(
update.effective_chat.id,
text="\n\n".join(response),
parse_mode="markdown",
disable_web_page_preview=True,
)
async def random_post(update: Update, context: ContextTypes.DEFAULT_TYPE):
_register_user(update)
with open("data/items.json") as fp:
items = json.load(fp)
post = items[random.choice(list(items))]
await context.bot.send_message(
update.effective_chat.id,
text=f"**[{post['title']}]({post['path']}):** {post['subtitle']}",
parse_mode="markdown",
)
## PAYMENT
async def donate(update: Update, context: ContextTypes.DEFAULT_TYPE):
if context.args:
amount = int(context.args[0])
else:
amount = 1
await _donate(update, context, amount)
async def _donate(update: Update, context: ContextTypes.DEFAULT_TYPE, amount: int):
await context.bot.send_invoice(
update.effective_chat.id,
f"Donate ${amount}",
f"Send a donation of ${amount}.00 USD.",
"donate:%i" % amount,
PAYMENT_TOKEN,
"USD",
prices=[LabeledPrice(f"Donation", amount * 100)],
)
async def lock_post(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.effective_user.id != ADMIN_ID:
return
public, secret, price = context.args
price = int(price)
config = load_config()
locked = config.get("locked", {})
locked[public] = dict(secret=secret, price=price)
update_config(locked=locked)
await context.bot.send_message(
update.effective_chat.id,
text=f"**{public}** has been locked.",
parse_mode="markdown",
)
async def locked(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.effective_user.id != ADMIN_ID:
return
config = load_config()
locked = config.get("locked", {})
for item in locked:
slug = item.split("/")[-1]
await context.bot.send_message(ADMIN_ID, text=f"""
**Public**: {item}
**Price:** ${locked[item]['price']/100:.2f}
**Secret:** {locked[item]['secret']}
**Copy unlock URL**
`https://t.me/{context.bot.username}?start={slug}`
""", parse_mode="markdown")
async def unlock_post(update: Update, context: ContextTypes.DEFAULT_TYPE):
_register_user(update)
with open("data/items.json") as fp:
all_items = json.load(fp)
config = load_config()
locked = config.get("locked", {})
locked_items = [path for path in locked if path in all_items]
if context.args:
locked_items = [path for path in locked_items if path.endswith(context.args[0])]
if not locked_items:
await context.bot.send_message(
update.effective_chat.id, text="No matching posts."
)
return
for path in locked_items:
title = all_items[path]["title"]
description = f"Unlock this article to read the full version. Original: {path}"
image = all_items[path]['image_url']
# select a payload just for you to recognize its the donation from your bot
payload = path
# In order to get a provider_token see https://core.telegram.org/bots/payments#getting-a-token
currency = "USD"
# price in dollars
prices = [LabeledPrice(f"Unlock post", locked[path]["price"])]
# optionally pass need_name=True, need_phone_number=True,
# need_email=True, need_shipping_address=True, is_flexible=True
await context.bot.send_invoice(
update.effective_chat.id,
title,
description,
payload,
PAYMENT_TOKEN,
currency,
prices,
photo_url=image
)
# after (optional) shipping, it's the pre-checkout
async def precheckout_callback(
update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Answers the PreQecheckoutQuery"""
query = update.pre_checkout_query
config = load_config()
locked = config.get("locked", {})
if query.invoice_payload.startswith("donate:"):
await query.answer(ok=True)
return
# check the payload, is this from your bot?
if query.invoice_payload in locked:
# answer False pre_checkout_query
await query.answer(ok=True)
return
await query.answer(ok=False, error_message="Something went wrong...")
async def successful_payment_callback(
update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Confirms the successful payment."""
# do something after successfully receiving payment?
config = load_config()
locked = config.get("locked", {})
payload = update.effective_message.successful_payment.invoice_payload
if payload in locked:
item = locked[payload]
await update.effective_message.reply_text(
f"""Thank you for your purchase!\n\nYou can read the full post [here]({item['secret']}).""",
parse_mode="markdown",
)
if payload.startswith("donate:"):
await update.effective_message.reply_text(
f"""Thank you for your donation!""",
parse_mode="markdown",
)
await context.bot.send_message(ADMIN_ID, text=f"💵 User @{update.effective_user.username or update.effective_user.id} paid {update.effective_message.successful_payment.total_amount} for {update.effective_message.successful_payment.invoice_payload}.")
## ADMIN
async def config(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.effective_user.id != ADMIN_ID:
return
await context.bot.send_document(update.effective_chat.id, "config.json")
async def broadcast(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.effective_user.id != ADMIN_ID:
return
message = update.effective_message.reply_to_message
config = load_config()
if context.args:
if context.args[0] == "all":
users = config["users"]
else:
users = [int(u) for u in context.args]
else:
users = config["notifications"]
for user in users:
await message.copy(chat_id=user)
await context.bot.send_message(
update.effective_chat.id, f"Notified {len(users)} users."
)
## MAIN
def main():
application = ApplicationBuilder().token(BOT_TOKEN).concurrent_updates(True).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help))
application.add_handler(CommandHandler("broadcast", broadcast))
application.add_handler(CommandHandler("config", config))
application.add_handler(CommandHandler("notify", notify))
application.add_handler(CommandHandler("mute", mute))
application.add_handler(CommandHandler("search", search))
application.add_handler(CommandHandler("random", random_post))
application.add_handler(CommandHandler("latest", latest))
application.add_handler(CommandHandler("lock", lock_post))
application.add_handler(CommandHandler("list", locked))
application.add_handler(CommandHandler("unlock", unlock_post))
application.add_handler(CommandHandler("donate", donate))
application.add_handler(PreCheckoutQueryHandler(precheckout_callback))
application.add_handler(
MessageHandler(filters.SUCCESSFUL_PAYMENT, successful_payment_callback)
)
application.run_polling()
if __name__ == "__main__":
main()