Source code for tgbox.api.db

"""This module stores wrappers around Tgbox SQL DB."""

import logging

from typing import (
    Optional, Union,
    AsyncGenerator
)
from os import PathLike
from pathlib import Path

import aiosqlite

from ..errors import PathIsDirectory
from ..tools import anext
from .. import defaults


__all__ = ['SqlTableWrapper', 'TgboxDB', 'TABLES']

logger = logging.getLogger(__name__)

TABLES = {
    'BOX_DATA': (
        ('BOX_CHANNEL_ID', 'BLOB NOT NULL'),
        ('BOX_CR_TIME', 'BLOB NOT NULL'),
        ('BOX_SALT', 'BLOB NOT NULL'),
        ('MAINKEY', 'BLOB'),
        ('SESSION', 'BLOB NOT NULL'),
        ('API_ID', 'BLOB NOT NULL'),
        ('API_HASH', 'BLOB NOT NULL'),
        ('FAST_SYNC_LAST_EVENT_ID', 'BLOB')
    ),
    'FILES': (
        ('ID', 'INTEGER PRIMARY KEY'),
        ('UPLOAD_TIME', 'BLOB NOT NULL'),
        ('PPATH_HEAD', 'BLOB NOT NULL'),
        ('FILEKEY', 'BLOB'),
        ('FINGERPRINT', 'BLOB'),
        ('METADATA', 'BLOB NOT NULL'),
        ('UPDATED_METADATA', 'BLOB')
    ),
    'PATH_PARTS': (
        ('ENC_PART', 'BLOB NOT NULL'),
        ('PART_ID', 'BLOB NOT NULL PRIMARY KEY'),
        ('PARENT_PART_ID', 'BLOB')
    ),
    # If you add any values to DEFAULTS, then update the
    # tgbox.utils.RemoteBoxDefaults, as well as all
    # RemoteBoxDefaults() in remote.py, because they
    # are not automatic
    'DEFAULTS': (                            # Default value
        ('METADATA_MAX', 'INTEGER NOT NULL', int(defaults.Limits.METADATA_MAX)),
        ('FILE_PATH_MAX', 'INTEGER NOT NULL', int(defaults.Limits.FILE_PATH_MAX)),

        ('DOWNLOAD_PATH', 'TEXT NOT NULL', str(defaults.DOWNLOAD_PATH)),
        ('DEF_NO_FOLDER', 'TEXT NOT NULL', str(defaults.DEF_NO_FOLDER)),
        ('DEF_UNK_FOLDER', 'TEXT NOT NULL', str(defaults.DEF_UNK_FOLDER)),
        ('FAST_SYNC_ENABLED', 'INTEGER NOT NULL', int(defaults.FAST_SYNC_ENABLED))
    )
}
[docs] class SqlTableWrapper: """A low-level wrapper to SQLite Tables.""" def __init__(self, aiosql_conn, table_name: str): self._table_name = table_name self._aiosql_conn = aiosql_conn def __repr__(self) -> str: return f'<class {self.__class__.__name__}(aiosql_conn, "{self._table_name}")>' async def __aiter__(self) -> tuple: """Will yield rows as self.select without ``sql_statement``""" async for row in self.select(): yield row @property def table_name(self) -> str: """Returns table name""" return self._table_name
[docs] async def count_rows(self) -> int: """Execute ``SELECT count(*) from TABLE_NAME``""" logger.debug(f'SELECT count(*) FROM {self._table_name}') cursor = await self._aiosql_conn.execute( f'SELECT count(*) FROM {self._table_name}' ) return (await cursor.fetchone())[0]
[docs] async def select(self, sql_tuple: Optional[tuple] = None) -> AsyncGenerator: """ If ``sql_tuple`` isn't specified, then will be used ``(SELECT * FROM TABLE_NAME, ())`` statement. """ if not sql_tuple: sql_tuple = (f'SELECT * FROM {self._table_name}',()) logger.debug(f'self._aiosql_conn.execute(*{sql_tuple})') cursor = await self._aiosql_conn.execute(*sql_tuple) async for row in cursor: yield row
[docs] async def select_once(self, sql_tuple: Optional[tuple] = None) -> tuple: """ Will return first row which match the ``sql_tuple``, see ``select()`` method for ``sql_tuple`` details. """ return await anext(self.select(sql_tuple=sql_tuple))
[docs] async def insert( self, *args, sql_statement: Optional[str] = None, commit: bool=True) -> None: """ If ``sql_statement`` isn't specified, then will be used ``INSERT INTO TABLE_NAME values (...)``. This method doesn't check if you insert correct data or correct amount of it, you should know DB structure. """ if not sql_statement: sql_statement = ( f'INSERT INTO {self._table_name} values (' + ('?,' * len(args))[:-1] + ')' ) logger.debug(f'self._aiosql_conn.execute({sql_statement}, {args})') await self._aiosql_conn.execute(sql_statement, args) if commit: logger.debug('self._aiosql_conn.commit()') await self._aiosql_conn.commit()
[docs] async def execute(self, sql_tuple: tuple, commit: bool=True): logger.debug(f'self._aiosql_conn.execute(*{sql_tuple})') result = await self._aiosql_conn.execute(*sql_tuple) if commit: logger.debug('self._aiosql_conn.commit()') await self._aiosql_conn.commit() return result # Returns Cursor object
[docs] async def commit(self) -> None: logger.info('SqlTableWrapper._aiosql_conn.commit()') await self._aiosql_conn.commit()
[docs] class TgboxDB: def __init__(self, db_path: Union[PathLike, str]): """ Arguments: db_path (``PathLike``, ``str``): Path to the Tgbox DB. """ if isinstance(db_path, PathLike): self._db_path = db_path else: self._db_path = Path(db_path) if self._db_path.is_dir(): raise PathIsDirectory('Path is directory.') self._db_path.parent.mkdir(exist_ok=True, parents=True) self._aiosql_db = None self._aiosql_db_is_closed = None self._initialized = False self._name = self._db_path.name def __str__(self) -> str: return ( f'{self.__class__.__name__}("{str(self._db_path)}") ' '# {self._initialized=}') def __repr__(self) -> str: return f'{self.__class__.__name__}("{str(self._db_path)}")' @property def name(self) -> str: """Returns TgboxDB name""" return self._name @property def db_path(self) -> PathLike: """Returns a path to TgboxDB file""" return self._db_path @property def initialized(self) -> bool: """Will return True if TgboxDB is initialized""" return self._initialized @property def closed(self) -> bool: """ This method will return ``None`` if DB wasn't opened, False if it's still opened, True if it's was closed. """ return self._aiosql_db_is_closed
[docs] @staticmethod async def create(db_path: Union[str, PathLike]) -> 'TgboxDB': """Will initialize TgboxDB""" return await TgboxDB(db_path).init()
[docs] async def close(self) -> None: """Will close TgboxDB""" logger.info(f'{self._db_path} @ self._aiosql_db.close()') await self._aiosql_db.close() self._aiosql_db_is_closed = True
[docs] async def init(self) -> 'TgboxDB': logger.debug(f'tgbox.api.db.TgboxDB.init("{self._db_path}")') logger.info(f'Opening SQLite connection to {self._db_path}') self._aiosql_db = await aiosqlite.connect(self._db_path) for table, data in TABLES.items(): try: columns = ', '.join((f'{i[0]} {i[1]}' for i in data)) await self._aiosql_db.execute( f'CREATE TABLE {table} ({columns})' ) if table == 'DEFAULTS': sql_statement = ( f'INSERT INTO {table} VALUES (' + ('?,' * len(data))[:-1] + ')' ) await self._aiosql_db.execute( sql_statement, [i[2] for i in data] ) except aiosqlite.OperationalError: # Table exists # The code below will update TgboxDB schema if it's outdated table_columns_ = await self._aiosql_db.execute( f'PRAGMA table_info({table})' ) old_table_columns = set() for i in await table_columns_.fetchall(): column = ( # Does not currently respect UNIQUE and will constantly i[1], # re-generate Table on startup. TODO. i[2] + (' NOT NULL' if i[3] else '')\ + (' PRIMARY KEY' if i[5] else '') ) old_table_columns.add(column) required_columns = set((i[0:2] for i in data)) if old_table_columns == required_columns: continue # Schemas are about same logger.info(f'TgboxDB {self._db_path} seems outdated. Updating...') table_columns = [i[:2] for i in data] old_table_columns = set(i[0] for i in old_table_columns) old_table_columns_str = ', '.join( i[0] for i in table_columns if i[0] in old_table_columns ) new_table_columns = table_columns[len(old_table_columns):] old_rows = await self._aiosql_db.execute( f'SELECT {old_table_columns_str} from {table}') columns_schema = [' '.join(i) for i in table_columns] columns_schema = ','.join(columns_schema).rstrip(',') logger.debug(f'CREATE TABLE "updated!{table}" ({columns})') await self._aiosql_db.execute( 'CREATE TABLE IF NOT EXISTS ' f'"updated!{table}" ({columns_schema})' ) new_values = [] for column in new_table_columns: for default_data in data: if default_data[0] == column[0]: if len(default_data) > 2: new_values.append(default_data[2]) else: new_values.append(None) q = ('?,'*len(table_columns)).rstrip(',') columns_insert = (i[0] for i in table_columns) columns_insert = ','.join(columns_insert).rstrip(',') while (fetch := await old_rows.fetchone()): await self._aiosql_db.execute( f'INSERT INTO "updated!{table}" ' f'({columns_insert}) VALUES ({q})', (*fetch, *new_values) ) logger.debug(f'DROP TABLE {table}') await self._aiosql_db.execute(f'DROP TABLE {table}') logger.debug(f'ALTER TABLE "updated!{table}" RENAME TO {table}') await self._aiosql_db.execute( f'ALTER TABLE "updated!{table}" RENAME TO {table}' ) logger.info('TgboxDB._aiosql_conn.commit()') await self._aiosql_db.commit() self._aiosql_db_is_closed = False tables = await self._aiosql_db.execute( 'SELECT name FROM sqlite_master WHERE type="table"' ) for table in (await tables.fetchall()): setattr(self, table[0], SqlTableWrapper(self._aiosql_db, table[0])) self._initialized = True return self