"""Module with utils for api package."""
import logging
from typing import (
BinaryIO, Optional,
Union, AsyncGenerator
)
from os import PathLike
from dataclasses import dataclass
from re import search as re_search
from base64 import urlsafe_b64encode
from telethon.tl.custom.file import File
from telethon.sessions import StringSession
from telethon.tl.types import Photo, Document
from telethon.tl.types.auth import SentCode
from telethon import TelegramClient as TTelegramClient
from telethon.errors import SessionPasswordNeededError
from telethon.tl.functions.auth import ResendCodeRequest
from ..defaults import VERSION
from ..fastelethon import download_file
from ..tools import anext, SearchFilter, _TypeList
from .db import TABLES, TgboxDB
__all__ = [
'search_generator',
'DirectoryRoot',
'PreparedFile',
'TelegramClient',
'DefaultsTableWrapper',
'RemoteBoxDefaults'
]
logger = logging.getLogger(__name__)
[docs]
class TelegramClient(TTelegramClient):
"""
A little extension to the ``telethon.TelegramClient``.
This class inherits Telethon's TelegramClient and support
all features that has ``telethon.TelegramClient``.
Typical usage:
.. code-block:: python
from asyncio import run as asyncio_run
from tgbox.api import TelegramClient, make_remotebox
from getpass import getpass # For hidden input
PHONE_NUMBER = '+10000000000' # Your phone number
API_ID = 1234567 # Your API_ID: https://my.telegram.org
API_HASH = '00000000000000000000000000000000' # Your API_HASH
async def main():
tc = TelegramClient(
phone_number = PHONE_NUMBER,
api_id = API_ID,
api_hash = API_HASH
)
await tc.connect()
await tc.send_code()
await tc.log_in(
code = int(input('Code: ')),
password = getpass('Pass: ')
)
erb = await make_remotebox(tc)
asyncio_run(main())
"""
__version__ = VERSION
def __init__(
self, api_id: int, api_hash: str,
phone_number: Optional[str] = None,
session: Optional[Union[str, StringSession]] = None,
**kwargs) -> None:
"""
.. note::
You should specify at least ``session`` or ``phone_number``.
Arguments:
api_id (``int``):
API_ID from https://my.telegram.org.
api_hash (``int``):
API_HASH from https://my.telegram.org.
phone_number (``str``, optional):
Phone number linked to your Telegram
account. You may want to specify it
to recieve log-in code. You should
specify it if ``session`` is ``None``.
session (``str``, ``StringSession``, optional):
``StringSession`` that give access to
your Telegram account. You can get it
after connecting and signing in via
``TelegramClient.session.save()`` method.
..tip::
This ``TelegramClient`` support all keywoard
arguments (**kwargs) that support parent
``telethon.TelegramClient`` object.
"""
if not session and not phone_number:
raise ValueError(
'You should specify at least ``session`` or ``phone_number``.'
)
super().__init__(
StringSession(session),
api_id, api_hash, **kwargs
)
self._api_id, self._api_hash = api_id, api_hash
self._phone_number = phone_number
[docs]
async def send_code(self, force_sms: Optional[bool]=False) -> SentCode:
"""
Sends the Telegram code needed to login to the given phone number.
Arguments:
force_sms (``bool``, optional):
Whether to force sending as SMS.
"""
logger.info(f'Sending login code to {self._phone_number}...')
return await self.send_code_request(
self._phone_number, force_sms=force_sms
)
[docs]
async def log_in(
self, password: Optional[str] = None,
code: Optional[Union[int,str]] = None) -> None:
"""
Logs in to Telegram to an existing user account.
You should only use this if you are not signed in yet.
Arguments:
password (``str``, optional):
Your 2FA password. You can ignore
this if you don't enabled it yet.
code (``int``, optional):
The code that Telegram sent you after calling
``TelegramClient.send_code()`` method.
"""
if not await self.is_user_authorized():
try:
logger.info(f'Trying to sign-in with {self._phone_number} and {code} code..')
await self.sign_in(self._phone_number, code)
except SessionPasswordNeededError:
logger.info(
'''Log-in without 2FA password failed. '''
f'''Trying to sign-in with {self._phone_number}, '''
f'''password and {code} code..'''
)
await self.sign_in(password=password)
else:
logger.debug(f'User {self._phone_number} is already authorized.')
[docs]
async def resend_code(self, sent_code: SentCode) -> SentCode:
"""
Will send you login code again. This can be used to
force Telegram send you SMS or Call to dictate code.
Arguments:
sent_code (``SentCode``):
Result of the ``tc.send_code`` or
result of the ``tc.resend_code`` method.
Example:
.. code-block:: python
tc = tgbox.api.TelegramClient(...)
sent_code = await tc.send_code()
sent_code = await tc.resend_code(sent_code)
"""
logger.info(f'Resending login code to {self._phone_number}...')
return await self(ResendCodeRequest(
self._phone_number, sent_code.phone_code_hash)
)
class TelegramVirtualFile:
"""
You can use this class for re-upload to RemoteBox
files that already was uploaded to any other
Telegram chat. Wrap it over ``Document`` and
specify in the ``DecryptedLocalBox.prepare_file``
"""
def __init__(self, document: Union[Photo, Document], tc: TelegramClient):
self.tc = tc
self.document = document
file = File(document)
self.name = file.name
self.size = file.size
self.mime = file.mime_type
self.duration = file.duration\
if file.duration else 0
self._downloader = None
def __repr__(self) -> str:
return (
f'''<class {self.__class__.__name__} @ '''
f'''{self.name=}, {self.size=}, {self.mime=}>'''
)
async def get_preview(self, quality: int=1) -> bytes:
if hasattr(self.document,'sizes')\
and not self.document.sizes:
return b''
if hasattr(self.document,'thumbs')\
and not self.document.thumbs:
return b''
return await self.tc.download_media(
message = self.document,
thumb = quality, file = bytes
)
async def read(self, size: int=-1) -> bytes:
"""Will return <= 512KiB of data. 'size' ignored"""
if not self._downloader:
self._downloader = download_file(
self.tc, self.document
)
chunk = await anext(self._downloader)
return chunk
[docs]
@dataclass
class PreparedFile:
"""
This dataclass store data needed for upload
by ``DecryptedRemoteBox.push_file`` in future.
Usually it's only for internal use.
"""
dlb: 'tgbox.api.local.DecryptedLocalBox'
file: BinaryIO
filekey: 'tgbox.keys.FileKey'
filesize: int
filepath: PathLike
filesalt: 'tgbox.crypto.FileSalt'
fingerprint: bytes
metadata: bytes
imported: bool
[docs]
def set_file_id(self, id: int):
"""You should set ID after pushing to remote"""
self.file_id = id
[docs]
def set_upload_time(self, upload_time: int):
"""You should set time after pushing to remote"""
self.upload_time = upload_time
[docs]
class DirectoryRoot:
"""
Type used to specify that you want to
access absolute local directory root.
This class doesn't have any methods,
please use it only for ``lbd.iterdir``
"""
[docs]
async def search_generator(
sf: SearchFilter, it_messages: Optional[AsyncGenerator] = None,
lb: Optional['tgbox.api.local.DecryptedLocalBox'] = None,
cache_preview: bool=True, reverse: bool=False) -> AsyncGenerator:
"""
Generator used to search for files in dlb and rb. It's
only for internal use and you shouldn't use it in your
own projects.
If file is exported from other RemoteBox and was imported to your
LocalBox, then you can specify ``dlb`` as ``lb``. AsyncGenerator
will try to get ``FileKey`` and decrypt ``EncryptedRemoteBoxFile``.
Otherwise imported file will be ignored.
"""
in_func = re_search if sf.in_filters['re'] else lambda p,s: p in s
if it_messages:
iter_from = it_messages
elif any((sf.in_filters['scope'], sf.ex_filters['scope'])):
if not sf.in_filters['scope']:
lbf = await anext(lb.files(), None)
if not lbf: return # Local doesn't have files
async def scope_generator(scope: Union[str, list]):
scope = scope if scope else DirectoryRoot
scope = scope if isinstance(scope, _TypeList) else [scope]
for current_scope in scope:
if current_scope is DirectoryRoot:
iterdir = lbf.directory.iterdir(ppid=current_scope)
elif hasattr(current_scope, '_part_id'):
iterdir = current_scope.iterdir()
else:
iterdir = await lb.get_directory(current_scope)
if not iterdir:
return
iterdir = iterdir.iterdir()
async for content in iterdir:
if hasattr(content, '_part_id'):
# This is DecryptedLocalBoxDirectory
if str(content) in sf.ex_filters['scope']\
or sf.in_filters['non_recursive_scope']:
continue # This directory is excluded
async for dlbf in scope_generator(content):
yield dlbf # This is DecryptedLocalBoxFile
else:
yield content # This is DecryptedLocalBoxFile
iter_from = scope_generator(sf.in_filters['scope'])
else:
min_id = sf.in_filters['min_id'][-1]\
if sf.in_filters['min_id'] else None
max_id = sf.in_filters['max_id'][-1]\
if sf.in_filters['max_id'] else None
iter_from = lb.files(
min_id = min_id,
max_id = max_id,
ids = sf.in_filters['id'],
cache_preview = cache_preview,
reverse = reverse
)
if not iter_from:
raise ValueError('At least it_messages or lb must be specified.')
async for file in iter_from:
if hasattr(file, '_rb'): # *RemoteBoxFile
file_size = file.file_size
elif hasattr(file, '_lb'): # *LocalBoxFile
file_size = file.size
else:
continue
if hasattr(file, 'file_path') and file.file_path:
file_path = str(file.file_path)
else:
file_path = ''
# We will use it as flags, the first
# is for 'include', the second is for
# 'exclude'. Both should be True to
# match SearchFilter filters.
yield_result = [True, True]
for indx, filter in enumerate((sf.in_filters, sf.ex_filters)):
if filter['imported']:
if bool(file.imported) != bool(filter['imported']):
if indx == 0: # O is Include
yield_result[indx] = False
break
elif bool(file.imported) == bool(filter['imported']):
if indx == 1: # 1 is Exclude
yield_result[indx] = False
break
for mime in filter['mime']:
if in_func(mime, file.mime):
if indx == 1:
yield_result[indx] = False
break
else:
if filter['mime']:
if indx == 0:
yield_result[indx] = False
break
if filter['min_time']:
if file.upload_time < filter['min_time'][-1]:
if indx == 0:
yield_result[indx] = False
break
elif file.upload_time >= filter['min_time'][-1]:
if indx == 1:
yield_result[indx] = False
break
if filter['max_time']:
if file.upload_time > filter['max_time'][-1]:
if indx == 0:
yield_result[indx] = False
break
elif file.upload_time <= filter['max_time'][-1]:
if indx == 1:
yield_result[indx] = False
break
if filter['min_size']:
if file_size < filter['min_size'][-1]:
if indx == 0:
yield_result[indx] = False
break
elif file_size >= filter['min_size'][-1]:
if indx == 1:
yield_result[indx] = False
break
if filter['max_size']:
if file_size > filter['max_size'][-1]:
if indx == 0:
yield_result[indx] = False
break
elif file_size <= filter['max_size'][-1]:
if indx == 1:
yield_result[indx] = False
break
if filter['min_id']:
if file.id < filter['min_id'][-1]:
if indx == 0:
yield_result[indx] = False
break
elif file.id >= filter['min_id'][-1]:
if indx == 1:
yield_result[indx] = False
break
if filter['max_id']:
if file.id > filter['max_id'][-1]:
if indx == 0:
yield_result[indx] = False
break
elif file.id <= filter['max_id'][-1]:
if indx == 1:
yield_result[indx] = False
break
for id in filter['id']:
if file.id == id:
if indx == 1:
yield_result[indx] = False
break
else:
if filter['id']:
if indx == 0:
yield_result[indx] = False
break
if hasattr(file, '_cattrs'):
for cattr in filter['cattrs']:
for k,v in cattr.items():
if k in file.cattrs:
if in_func(v, file.cattrs[k]):
if indx == 1:
yield_result[indx] = False
break
else:
if filter['cattrs']:
if indx == 0:
yield_result[indx] = False
break
# If it_messages is specified, then we're making search
# on the RemoteBox, thus, we can't use the "scope" filter,
# which is LocalBox-only; so we will treat it as the
# simple "file_path" filter to mimic "scope".
if it_messages:
sf_file_path = [*filter['file_path'], *filter['scope']]
else:
sf_file_path = filter['file_path']
for filter_file_path in sf_file_path:
if in_func(str(filter_file_path), file_path):
if indx == 1:
yield_result[indx] = False
break
else:
if sf_file_path:
if indx == 0:
yield_result[indx] = False
break
for file_name in filter['file_name']:
if in_func(file_name, file.file_name):
if indx == 1:
yield_result[indx] = False
break
else:
if filter['file_name']:
if indx == 0:
yield_result[indx] = False
break
for file_salt in filter['file_salt']:
if isinstance(file_salt, str):
fsalt = urlsafe_b64encode(file.file_salt.salt).decode()
else:
fsalt = file.file_salt
if in_func(file_salt, fsalt):
if indx == 1:
yield_result[indx] = False
break
else:
if filter['file_salt']:
if indx == 0:
yield_result[indx] = False
break
for verbyte in filter['verbyte']:
if verbyte == file.verbyte:
if indx == 1:
yield_result[indx] = False
break
else:
if filter['verbyte']:
if indx == 0:
yield_result[indx] = False
break
if all(yield_result):
logger.debug(f'SearchFilter matched ID{file.id}')
yield file
else:
logger.debug(f'SearchFilter mismatch ID{file.id} [{yield_result}]')
continue
[docs]
class DefaultsTableWrapper:
"""
This little class will wrap around the
DEFAULTS table of TGBOX DB and will
fetch all contents of it.
You can await the ``change`` coroutine
to change default values to your own.
"""
def __init__(self, tgbox_db: TgboxDB):
"""
Arguments:
tgbox_db (``TgboxDB``):
An initialized ``TgboxDB``.
"""
self._tgbox_db = tgbox_db
self._initialized = False
def __repr__(self) -> str:
return (f'{self.__class__.__name__}({repr(self._tgbox_db)})')
def __str__(self) -> str:
return (f'{self.__class__.__name__}({repr(self._tgbox_db)}) # {self._initialized=}')
@property
def initialized(self) -> bool:
return self._initialized
[docs]
async def init(self) -> 'DefaultsTableWrapper':
"""Fetch the defaults and initialize"""
logger.debug(
'''Initializing DefaultsTableWrapper for '''
f'''{self._tgbox_db._db_path} LocalBox'''
)
if self._tgbox_db.closed:
await self._tgbox_db.init()
defaults = await self._tgbox_db.DEFAULTS.select_once()
for default, value in zip(TABLES['DEFAULTS'], defaults):
setattr(self, default[0], value)
self._initialized = True
return self
[docs]
async def change(self, key: str, value) -> None:
"""
This method can change the defaults values
Arguments:
key (``str``):
Key to change, i.e METADATA_MAX.
value:
Key's new value.
.. warning::
We **don't** verify here that value
type corresponds to real type of Key
or that value doesn't overflow the
allowed value maximum. Be sure to
specify the correct Key values.
Example:
.. code-block:: python
from asyncio import run as asyncio_run
from tgbox.defaults import DEF_TGBOX_NAME
from tgbox.api.db import TgboxDB
from tgbox.api.utils import DefaultsTableWrapper
async def main():
# Make a DefaultsTableWrapper object
tdb = await TgboxDB(DEF_TGBOX_NAME).init()
dtw = await DefaultsTableWrapper(tdb).init()
# Change METADATA_MAX to the max allowed size
dtw.change('METADATA_MAX', 256**3-1)
# Access DTW from the DecryptedLocalBox
... # Some code was omited here
# Change the default download path
dlb.defaults.change('DOWNLOAD_PATH', 'Downloads')
asyncio_run(main())
"""
getattr(self, key) # Vetrify that Key exist
logger.info(f'Changing defaults | UPDATE DEFAULTS SET {key}={value}')
await self._tgbox_db.DEFAULTS.execute((
f'UPDATE DEFAULTS SET {key}=?', (value,)
))
[docs]
@dataclass
class RemoteBoxDefaults:
METADATA_MAX: int
FILE_PATH_MAX: int
DEF_UNK_FOLDER: Union[str, PathLike]
DEF_NO_FOLDER: Union[str, PathLike]
DOWNLOAD_PATH: Union[str, PathLike]