From 54e1af974bf67a0f71c9c8347bcc0052aed1459a Mon Sep 17 00:00:00 2001 From: Danish Date: Thu, 29 Apr 2021 15:48:46 +0530 Subject: [PATCH] v3.0 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Fast Upload and Download • Multi audio + subtitles Support • Fixed Bugs --- helper/FastTelethon.py | 310 +++++++++++++++++++++++++++++++++++++++++ helper/__init__.py | 3 + helper/funcn.py | 8 +- helper/worker.py | 109 ++++++++++----- 4 files changed, 392 insertions(+), 38 deletions(-) create mode 100644 helper/FastTelethon.py diff --git a/helper/FastTelethon.py b/helper/FastTelethon.py new file mode 100644 index 00000000..2fe138e2 --- /dev/null +++ b/helper/FastTelethon.py @@ -0,0 +1,310 @@ +""" +> Based on parallel_file_transfer.py from mautrix-telegram, with permission to distribute under the MIT license +> Copyright (C) 2019 Tulir Asokan - https://github.com/tulir/mautrix-telegram +""" +import asyncio +import hashlib +import inspect +import logging +import math +import os +from collections import defaultdict +from typing import Optional, List, AsyncGenerator, Union, Awaitable, DefaultDict, Tuple, BinaryIO + +from telethon import utils, helpers, TelegramClient +from telethon.crypto import AuthKey +from telethon.network import MTProtoSender +from telethon.tl.alltlobjects import LAYER +from telethon.tl.functions import InvokeWithLayerRequest +from telethon.tl.functions.auth import ExportAuthorizationRequest, ImportAuthorizationRequest +from telethon.tl.functions.upload import (GetFileRequest, SaveFilePartRequest, + SaveBigFilePartRequest) +from telethon.tl.types import (Document, InputFileLocation, InputDocumentFileLocation, + InputPhotoFileLocation, InputPeerPhotoFileLocation, TypeInputFile, + InputFileBig, InputFile) + +filename = "" + +async_encrypt_attachment = None + +log: logging.Logger = logging.getLogger("telethon") + +TypeLocation = Union[Document, InputDocumentFileLocation, InputPeerPhotoFileLocation, + InputFileLocation, InputPhotoFileLocation] + + +class DownloadSender: + client: TelegramClient + sender: MTProtoSender + request: GetFileRequest + remaining: int + stride: int + + def __init__(self, client: TelegramClient, sender: MTProtoSender, file: TypeLocation, offset: int, limit: int, + stride: int, count: int) -> None: + self.sender = sender + self.client = client + self.request = GetFileRequest(file, offset=offset, limit=limit) + self.stride = stride + self.remaining = count + + async def next(self) -> Optional[bytes]: + if not self.remaining: + return None + result = await self.client._call(self.sender, self.request) + self.remaining -= 1 + self.request.offset += self.stride + return result.bytes + + def disconnect(self) -> Awaitable[None]: + return self.sender.disconnect() + + +class UploadSender: + client: TelegramClient + sender: MTProtoSender + request: Union[SaveFilePartRequest, SaveBigFilePartRequest] + part_count: int + stride: int + previous: Optional[asyncio.Task] + loop: asyncio.AbstractEventLoop + + def __init__(self, client: TelegramClient, sender: MTProtoSender, file_id: int, part_count: int, big: bool, + index: int, + stride: int, loop: asyncio.AbstractEventLoop) -> None: + self.client = client + self.sender = sender + self.part_count = part_count + if big: + self.request = SaveBigFilePartRequest(file_id, index, part_count, b"") + else: + self.request = SaveFilePartRequest(file_id, index, b"") + self.stride = stride + self.previous = None + self.loop = loop + + async def next(self, data: bytes) -> None: + if self.previous: + await self.previous + self.previous = self.loop.create_task(self._next(data)) + + async def _next(self, data: bytes) -> None: + self.request.bytes = data + log.debug(f"Sending file part {self.request.file_part}/{self.part_count}" + f" with {len(data)} bytes") + await self.client._call(self.sender, self.request) + self.request.file_part += self.stride + + async def disconnect(self) -> None: + if self.previous: + await self.previous + return await self.sender.disconnect() + + +class ParallelTransferrer: + client: TelegramClient + loop: asyncio.AbstractEventLoop + dc_id: int + senders: Optional[List[Union[DownloadSender, UploadSender]]] + auth_key: AuthKey + upload_ticker: int + + def __init__(self, client: TelegramClient, dc_id: Optional[int] = None) -> None: + self.client = client + self.loop = self.client.loop + self.dc_id = dc_id or self.client.session.dc_id + self.auth_key = (None if dc_id and self.client.session.dc_id != dc_id + else self.client.session.auth_key) + self.senders = None + self.upload_ticker = 0 + + async def _cleanup(self) -> None: + await asyncio.gather(*[sender.disconnect() for sender in self.senders]) + self.senders = None + + @staticmethod + def _get_connection_count(file_size: int, max_count: int = 20, + full_size: int = 100 * 1024 * 1024) -> int: + if file_size > full_size: + return max_count + return math.ceil((file_size / full_size) * max_count) + + async def _init_download(self, connections: int, file: TypeLocation, part_count: int, + part_size: int) -> None: + minimum, remainder = divmod(part_count, connections) + + def get_part_count() -> int: + nonlocal remainder + if remainder > 0: + remainder -= 1 + return minimum + 1 + return minimum + + # The first cross-DC sender will export+import the authorization, so we always create it + # before creating any other senders. + self.senders = [ + await self._create_download_sender(file, 0, part_size, connections * part_size, + get_part_count()), + *await asyncio.gather( + *[self._create_download_sender(file, i, part_size, connections * part_size, + get_part_count()) + for i in range(1, connections)]) + ] + + async def _create_download_sender(self, file: TypeLocation, index: int, part_size: int, + stride: int, + part_count: int) -> DownloadSender: + return DownloadSender(self.client, await self._create_sender(), file, index * part_size, part_size, + stride, part_count) + + async def _init_upload(self, connections: int, file_id: int, part_count: int, big: bool + ) -> None: + self.senders = [ + await self._create_upload_sender(file_id, part_count, big, 0, connections), + *await asyncio.gather( + *[self._create_upload_sender(file_id, part_count, big, i, connections) + for i in range(1, connections)]) + ] + + async def _create_upload_sender(self, file_id: int, part_count: int, big: bool, index: int, + stride: int) -> UploadSender: + return UploadSender(self.client, await self._create_sender(), file_id, part_count, big, index, stride, + loop=self.loop) + + async def _create_sender(self) -> MTProtoSender: + dc = await self.client._get_dc(self.dc_id) + sender = MTProtoSender(self.auth_key, loggers=self.client._log) + await sender.connect(self.client._connection(dc.ip_address, dc.port, dc.id, + loggers=self.client._log, + proxy=self.client._proxy)) + if not self.auth_key: + log.debug(f"Exporting auth to DC {self.dc_id}") + auth = await self.client(ExportAuthorizationRequest(self.dc_id)) + self.client._init_request.query = ImportAuthorizationRequest(id=auth.id, + bytes=auth.bytes) + req = InvokeWithLayerRequest(LAYER, self.client._init_request) + await sender.send(req) + self.auth_key = sender.auth_key + return sender + + async def init_upload(self, file_id: int, file_size: int, part_size_kb: Optional[float] = None, + connection_count: Optional[int] = None) -> Tuple[int, int, bool]: + connection_count = connection_count or self._get_connection_count(file_size) + part_size = (part_size_kb or utils.get_appropriated_part_size(file_size)) * 1024 + part_count = (file_size + part_size - 1) // part_size + is_large = file_size > 10 * 1024 * 1024 + await self._init_upload(connection_count, file_id, part_count, is_large) + return part_size, part_count, is_large + + async def upload(self, part: bytes) -> None: + await self.senders[self.upload_ticker].next(part) + self.upload_ticker = (self.upload_ticker + 1) % len(self.senders) + + async def finish_upload(self) -> None: + await self._cleanup() + + async def download(self, file: TypeLocation, file_size: int, + part_size_kb: Optional[float] = None, + connection_count: Optional[int] = None) -> AsyncGenerator[bytes, None]: + connection_count = connection_count or self._get_connection_count(file_size) + part_size = (part_size_kb or utils.get_appropriated_part_size(file_size)) * 1024 + part_count = math.ceil(file_size / part_size) + log.debug("Starting parallel download: " + f"{connection_count} {part_size} {part_count} {file!s}") + await self._init_download(connection_count, file, part_count, part_size) + + part = 0 + while part < part_count: + tasks = [] + for sender in self.senders: + tasks.append(self.loop.create_task(sender.next())) + for task in tasks: + data = await task + if not data: + break + yield data + part += 1 + log.debug(f"Part {part} downloaded") + + log.debug("Parallel download finished, cleaning up connections") + await self._cleanup() + + +parallel_transfer_locks: DefaultDict[int, asyncio.Lock] = defaultdict(lambda: asyncio.Lock()) + + +def stream_file(file_to_stream: BinaryIO, chunk_size=1024): + while True: + data_read = file_to_stream.read(chunk_size) + if not data_read: + break + yield data_read + + +async def _internal_transfer_to_telegram(client: TelegramClient, + response: BinaryIO, + progress_callback: callable + ) -> Tuple[TypeInputFile, int]: + file_id = helpers.generate_random_long() + file_size = os.path.getsize(response.name) + + hash_md5 = hashlib.md5() + uploader = ParallelTransferrer(client) + part_size, part_count, is_large = await uploader.init_upload(file_id, file_size) + buffer = bytearray() + for data in stream_file(response): + if progress_callback: + r = progress_callback(response.tell(), file_size) + if inspect.isawaitable(r): + await r + if not is_large: + hash_md5.update(data) + if len(buffer) == 0 and len(data) == part_size: + await uploader.upload(data) + continue + new_len = len(buffer) + len(data) + if new_len >= part_size: + cutoff = part_size - len(buffer) + buffer.extend(data[:cutoff]) + await uploader.upload(bytes(buffer)) + buffer.clear() + buffer.extend(data[cutoff:]) + else: + buffer.extend(data) + if len(buffer) > 0: + await uploader.upload(bytes(buffer)) + await uploader.finish_upload() + if is_large: + return InputFileBig(file_id, part_count, filename), file_size + else: + return InputFile(file_id, part_count, filename, hash_md5.hexdigest()), file_size + + +async def download_file(client: TelegramClient, + location: TypeLocation, + out: BinaryIO, + progress_callback: callable = None + ) -> BinaryIO: + size = location.size + dc_id, location = utils.get_input_location(location) + # We lock the transfers because telegram has connection count limits + downloader = ParallelTransferrer(client, dc_id) + downloaded = downloader.download(location, size) + async for x in downloaded: + out.write(x) + if progress_callback: + r = progress_callback(out.tell(), size) + if inspect.isawaitable(r): + await r + + return out + + +async def upload_file(client: TelegramClient, + file: BinaryIO, + name, + progress_callback: callable = None, + ) -> TypeInputFile: + global filename + filename = name + return (await _internal_transfer_to_telegram(client, file, progress_callback))[0] diff --git a/helper/__init__.py b/helper/__init__.py index 5b3c739a..40a413d0 100644 --- a/helper/__init__.py +++ b/helper/__init__.py @@ -39,3 +39,6 @@ from telethon.tl.functions.messages import ExportChatInviteRequest as cl from telethon.tl.functions.users import GetFullUserRequest from telethon.utils import get_display_name + +basicConfig(format="[%(levelname) 5s/%(asctime)s] %(name)s: %(message)s", level=INFO) +LOGS = getLogger(__name__) \ No newline at end of file diff --git a/helper/funcn.py b/helper/funcn.py index aa149058..ace68c07 100644 --- a/helper/funcn.py +++ b/helper/funcn.py @@ -17,7 +17,7 @@ COUNT = [] uptime = dt.now() -os.system("wget https://telegra.ph/file/75ee20ec8d8c8bba84f02.jpg") +os.system("wget https://telegra.ph/file/75ee20ec8d8c8bba84f02.jpg -O thumb.jpg") if not os.path.isdir("downloads/"): os.mkdir("downloads/") @@ -79,8 +79,8 @@ async def progress(current, total, event, start, type_of_ps, file=None): speed = current / diff time_to_completion = round((total - current) / speed) * 1000 progress_str = "`[{0}{1}] {2}%`\n\n".format( - "".join(["𒊹︎︎︎" for i in range(math.floor(percentage / 5))]), - "".join(["●" for i in range(20 - math.floor(percentage / 5))]), + "".join(["●" for i in range(math.floor(percentage / 5))]), + "".join(["○" for i in range(20 - math.floor(percentage / 5))]), round(percentage, 2), ) tmp = ( @@ -116,7 +116,7 @@ async def genss(file): async def duration_s(file): tsec = await genss(file) x = round(tsec / 5) - y = round(tsec / 5 + 60) + y = round(tsec / 5 + 30) pin = stdr(x) if y < tsec: pon = stdr(y) diff --git a/helper/worker.py b/helper/worker.py index b1393685..0ca0f9ec 100644 --- a/helper/worker.py +++ b/helper/worker.py @@ -14,7 +14,7 @@ from .funcn import * - +from .FastTelethon import download_file, upload_file async def screenshot(e): await e.edit("`Generating Screenshots...`") @@ -79,7 +79,7 @@ async def encc(e): [Button.inline("CANCEL PROCESS", data=f"skip{wah}")], ], ) - cmd = f"ffmpeg -i '{dl}' -preset ultrafast -vcodec libx265 -crf 28 '{out}' -y" + cmd = f'ffmpeg -i "{dl}" -preset ultrafast -c:v libx265 -crf 27 -map 0:v -c:a aac -map 0:a -c:s copy -map 0:s? "{out}" -y' process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) @@ -97,15 +97,21 @@ async def encc(e): ttt = time.time() await nn.delete() nnn = await e.client.send_message(e.chat_id, "`Uploading...`") + with open(out, "rb") as f: + ok = await upload_file( + client=e.client, + file=f, + name=out, + progress_callback=lambda d, t: asyncio.get_event_loop().create_task( + progress(d, t, nnn, ttt, "uploading..") + ), + ) ds = await e.client.send_file( e.chat_id, - file=f"{out}", + file=ok, force_document=True, - thumb=thum, - progress_callback=lambda d, t: asyncio.get_event_loop().create_task( - progress(d, t, nnn, ttt, "uploading..", file=f"{out}") - ), - ) + thumb=thum) + await nnn.delete() org = int(Path(dl).stat().st_size) com = int(Path(out).stat().st_size) pe = 100 - ((com / org) * 100) @@ -122,11 +128,11 @@ async def encc(e): ) await ds.forward_to(LOG) await dk.forward_to(LOG) - await nnn.delete() COUNT.remove(e.chat_id) os.remove(dl) os.remove(out) - except BaseException: + except Exception as er: + LOGS.info(er) return COUNT.remove(e.chat_id) @@ -143,7 +149,7 @@ async def sample(e): [Button.inline("CANCEL PROCESS", data=f"skip{wah}")], ], ) - ncmd = f"ffmpeg -i '{dl}' -preset ultrafast -ss {ss} -to {dd} -c:v libx265 -crf 28 '{out}'" + ncmd = f'ffmpeg -i "{dl}" -preset ultrafast -ss {ss} -to {dd} -c:v libx265 -crf 27 -map 0:v -c:a aac -map 0:a -c:s copy -map 0:s? "{out}" -y' process = await asyncio.create_subprocess_shell( ncmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) @@ -193,10 +199,10 @@ async def encod(event): user = await event.get_chat() if not event.media: return - try: + if hasattr(event.media, "document"): if not event.media.document.mime_type.startswith(("video","application/octet-stream")): return - except BaseException: + elif hasattr(event.media, "photo"): return try: oc = event.fwd_from.from_id.user_id @@ -206,13 +212,14 @@ async def encod(event): except BaseException: pass xxx = await event.reply("`Downloading...`") - pp = [] - # async for x in event.client.iter_participants("ensemblygroup"): + """ For Force Subscribe Channel""" + # pp = [] + # async for x in event.client.iter_participants("put group username"): # pp.append(x.id) # if (user.id) not in pp: # return await xxx.edit( # "U Must Subscribe This Channel To Use This Bot", - # buttons=[Button.url("JOIN CHANNEL", url="t.me/ensemblygroup")], + # buttons=[Button.url("JOIN CHANNEL", url="put group link")], # ) if len(COUNT) > 4 and user.id != OWNER: llink = (await event.client(cl(LOG))).link @@ -237,14 +244,40 @@ async def encod(event): if not os.path.isdir(dir): os.mkdir(dir) try: - dl = await event.client.download_media( - event.media, - dir, - progress_callback=lambda d, t: asyncio.get_event_loop().create_task( - progress(d, t, xxx, ttt, "Downloading") - ), - ) - except BaseException: + if hasattr(event.media, "document"): + file = event.media.document + mime_type = file.mime_type + filename = event.file.name + if not filename: + filename = ( + "video_" + dt.now().isoformat("_", "seconds") + ".mp4" + ) + dl = dir + filename + with open(dl, "wb") as f: + ok = await download_file( + client=event.client, + location=file, + out=f, + progress_callback=lambda d, t: asyncio.get_event_loop().create_task( + progress( + d, + t, + xxx, + ttt, + "Downloading", + ) + ), + ) + else: + dl = await event.client.download_media( + event.media, + dir, + progress_callback=lambda d, t: asyncio.get_event_loop().create_task( + progress(d, t, xxx, ttt, "Downloading") + ), + ) + except Exception as er: + LOGS.info(er) COUNT.remove(user.id) return os.remove(dl) es = dt.now() @@ -255,7 +288,7 @@ async def encod(event): os.mkdir(rr) bb = kk.replace(f".{aa}", " compressed.mkv") out = f"{rr}/{bb}" - thum = "75ee20ec8d8c8bba84f02.jpg" + thum = "thumb.jpg" dtime = ts(int((es - s).seconds) * 1000) hehe = f"{out};{dl};{thum};{dtime}" key = code(hehe) @@ -274,7 +307,8 @@ async def encod(event): [Button.inline("COMPRESS", data=f"sencc{key}")], ], ) - except BaseException: + except BaseException as er: + LOGS.info(er) return COUNT.remove(user.id) @@ -291,7 +325,7 @@ async def customenc(e, key): [Button.inline("CANCEL PROCESS", data=f"skip{wah}")], ], ) - cmd = f"ffmpeg -i '{dl}' -preset ultrafast -vcodec libx265 -crf 28 '{out}' -y" + cmd = f'ffmpeg -i "{dl}" -preset ultrafast -c:v libx265 -crf 27 -map 0:v -c:a aac -map 0:a -c:s copy -map 0:s? "{out}" -y' process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) @@ -311,16 +345,23 @@ async def customenc(e, key): await nn.delete() nnn = await e.client.send_message(e.chat_id, "`Uploading...`") try: + with open(out, "rb") as f: + ok = await upload_file( + client=e.client, + file=f, + name=out, + progress_callback=lambda d, t: asyncio.get_event_loop().create_task( + progress(d, t, nnn, ttt, "uploading..") + ), + ) ds = await e.client.send_file( e.chat_id, - file=f"{out}", + file=ok, force_document=True, - thumb=thum, - progress_callback=lambda d, t: asyncio.get_event_loop().create_task( - progress(d, t, nnn, ttt, "uploading..", file=f"{out}") - ), - ) - except BaseException: + thumb=thum) + await nnn.delete() + except Exception as er: + LOGS.info(er) COUNT.remove(e.chat_id) os.remove(dl) return os.remove(out)