Compare commits

...

2 Commits

Author SHA1 Message Date
ge
40f121ad84 upd 2023-07-19 06:45:26 +03:00
ge
980f9ea791 refactor & add postgres 2023-07-19 06:44:58 +03:00
6 changed files with 163 additions and 72 deletions

View File

@ -8,11 +8,11 @@
**Quicker** is a pythonic tool for querying databases.
Quicker wraps popular Python packages:
Quicker wraps Python bindings on DBMS libraries:
- `mysqlclient` for MySQL.
- `psycopg2` for PostgreSQL (not implemented yet).
- Python builtin `sqlite` for SQLite (not implemented yet).
- `psycopg2` for PostgreSQL.
- `sqlite` from Python standard library for SQLite (not implemented yet).
Connection parameters will passed to "backend" module as is.
@ -28,7 +28,7 @@ pip install git+https://git.nxhs.cloud/ge/quicker
```python
with Connection(**config) as db:
db.exec("sql query here...")
db.execute("sql query here...")
db.query("sql query here...") # query is alias for exec()
# Query is callable and you can also do this:
@ -38,9 +38,10 @@ with Connection(**config) as query:
`Query` cannot be called itself, you must use `Connection` to correctly initialise `Query` object. Available methods and properties:
- `query()`, `exec()`. Execute SQL. There is You can use here this syntax: `query('SELECT * FROM users WHERE id = %s', (15,))`.
- `query()`, `execute()`. Execute SQL. There is You can use here this syntax: `query('SELECT * FROM users WHERE id = %s', (15,))`.
- `commit()`. Write changes into database.
- `cursor`. Call [MySQLdb Cursor object](https://mysqlclient.readthedocs.io/user_guide.html#cursor-objects) methods directly.
- `cursor`. Access cursor object directly.
- `connection`. Access connection object directly.
Full example:
@ -88,7 +89,30 @@ Changing database:
from quicker import Connection
with Connection(provider='mysql', read_default_file='~/.my.cnf') as db:
db.query("INSERT INTO users VALUE (3, 'user2', 'user2@example.org')")
db.query("INSERT INTO `users` VALUE (3, 'user2', 'user2@example.org')")
```
Quicker by default make commit after closing context. Set option `commit=False` to disable automatic commit.
For logging add following code:
```
import logging
logging.basicConfig(level=logging.DEBUG)
```
Direct access to Cursor object:
```
from quicker import Connection, make_list
# config declaration here...
with Connection(**config) as db:
db.cursor.execute('SELECT `id`, `name`, `email` FROM `users` WHERE `name` = %s', ('John',))
users = db.cursor.fetchall()
# Note: user is tuple! Convert it to list of dicts!
colnames = [desc[0] for desc in db.cursor.description]
users_list = make_list(colnames, users)
```

View File

@ -2,7 +2,7 @@ version: '3.1'
services:
db:
mysql:
image: mysql:8
command: --default-authentication-plugin=mysql_native_password
restart: always
@ -14,6 +14,16 @@ services:
ports:
- 3306:3306
postgres:
image: postgres:15
restart: always
environment:
POSTGRES_USER: myuser
POSTGRES_PASSWORD: example
POSTGRES_DB: mydb
ports:
- 5432:5432
adminer:
image: adminer
restart: always

22
poetry.lock generated
View File

@ -1,23 +1,7 @@
# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand.
[[package]]
name = "mysqlclient"
version = "2.2.0"
description = "Python interface to MySQL"
category = "main"
optional = false
python-versions = ">=3.8"
files = [
{file = "mysqlclient-2.2.0-cp310-cp310-win_amd64.whl", hash = "sha256:68837b6bb23170acffb43ae411e47533a560b6360c06dac39aa55700972c93b2"},
{file = "mysqlclient-2.2.0-cp311-cp311-win_amd64.whl", hash = "sha256:5670679ff1be1cc3fef0fa81bf39f0cd70605ba121141050f02743eb878ac114"},
{file = "mysqlclient-2.2.0-cp38-cp38-win_amd64.whl", hash = "sha256:004fe1d30d2c2ff8072f8ea513bcec235fd9b896f70dad369461d0ad7e570e98"},
{file = "mysqlclient-2.2.0-cp39-cp39-win_amd64.whl", hash = "sha256:9c6b142836c7dba4f723bf9c93cc46b6e5081d65b2af807f400dda9eb85a16d0"},
{file = "mysqlclient-2.2.0-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:955dba905a7443ce4788c63fdb9f8d688316260cf60b20ff51ac3b1c77616ede"},
{file = "mysqlclient-2.2.0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:530ece9995a36cadb6211b9787f0c9e05cdab6702549bdb4236af5e9b535ed6a"},
{file = "mysqlclient-2.2.0.tar.gz", hash = "sha256:04368445f9c487d8abb7a878e3d23e923e6072c04a6c320f9e0dc8a82efba14e"},
]
package = []
[metadata]
lock-version = "2.0"
python-versions = "^3.11"
content-hash = "462894cf2fdfd3121ca0c3a328dce7012ba1cecd25ba04d8a853c1fc21dde8a3"
python-versions = "^3.8"
content-hash = "935b488be9f11b23f14aa1ce3bed4013d88bd77462534b5e4fe5bb82b485bfe9"

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "quicker"
version = "0.1.0"
version = "0.2.0"
description = "Query databases quickly."
authors = ["ge <ge@nixhacks.net>"]
license = "Unlicense"
@ -8,7 +8,6 @@ readme = "README.md"
[tool.poetry.dependencies]
python = "^3.8"
mysqlclient = "^1.4.6"
[build-system]
requires = ["poetry-core"]

View File

@ -1 +1,9 @@
from .main import Connection
# ____ _ __
# / __ \__ __(_)____/ /_____ _____
# / / / / / / / / ___/ //_/ _ \/ ___/
# / /_/ / /_/ / / /__/ ,< / __/ /
# \___\_\__,_/_/\___/_/|_|\___/_/
#
# Quicker -- pythonic tool for querying databases
from .main import Connection, make_list

View File

@ -1,5 +1,40 @@
import logging
import importlib.util
from enum import Enum
from typing import Optional, List, Union, Tuple
logger = logging.getLogger(__name__)
def _import(module_name: str, symbol: Optional[str] = None):
spec = importlib.util.find_spec(module_name)
if spec is None:
raise ImportError(f"Module '{module_name}' not found.")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if symbol:
try:
return getattr(module, symbol)
except AttributeError:
raise ImportError()
return module
def make_list(
column_names: List[str], rows: Union[tuple, Tuple[dict, ...]]
) -> List[dict]:
"""Convert output to list of dicts from tuples."""
data = []
for row in rows:
if isinstance(row, dict):
data.append(row)
else:
item = {}
for i in range(len(row)):
item[column_names[i]] = row[i]
data.append(item)
return data
class Provider(str, Enum):
@ -12,7 +47,12 @@ class Provider(str, Enum):
class Connection:
"""
Connection is context manager that allows to establish connection
with database.
with database and make queries. Example::
>>> from quicker import Connection
>>> with Connection(provider='mysql', read_default_file='~/.my.cnf') as q:
... server = q('SELECT * FROM `servers` WHERE `id` = %s', (1735781,))
>>>
"""
def __init__(self,
@ -22,64 +62,90 @@ class Connection:
):
if not provider:
raise ValueError('Database provider is not set')
self._provider = provider
self._provider = Provider(provider)
self._commit = commit
self._connection_args = kwargs
def __enter__(self):
logger.debug(f'Database provider={self._provider}')
# -- MySQL / MariaDB --
if self._provider == Provider.MYSQL:
MySQLdb = self._import('MySQLdb')
self._connection = MySQLdb.connect(**self._connection_args)
MySQLdb = _import('MySQLdb')
DictCursor = _import('MySQLdb.cursors', 'DictCursor')
try:
if self._connection_args['cursorclass']:
cursorclass = DictCursor
except KeyError:
cursorclass = DictCursor
self._connection = MySQLdb.connect(
**self._connection_args,
cursorclass=cursorclass,
)
logger.debug('Session started')
self._cursor = self._connection.cursor()
return Query(self)
# -- PostgreSQL --
if self._provider == Provider.POSTGRES:
psycopg2 = _import('psycopg2')
dbname = self._connection_args.pop('database')
self._connection_args['dbname'] = dbname
self._connection = psycopg2.connect(**self._connection_args)
self._cursor = self._connection.cursor()
return Query(self)
def __exit__(self, exception_type, exception_value, exception_traceback):
if self._provider == Provider.MYSQL:
if self._commit:
self._connection.commit()
logger.debug('Changes commited into database')
self._cursor.close()
self._connection.close()
def _import(self, lib: str):
spec = importlib.util.find_spec(lib)
if spec is None:
raise ImportError(f"Module '{lib}' not found.")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
logger.debug('Connection closed')
class Query:
def __init__(self, connect):
self._provider = connect._provider
self._connection = connect._connection
self.cursor = self._connection.cursor()
self._cursor = connect._cursor
def __call__(self, *args, **kwargs):
return self.exec(*args, **kwargs)
return self.execute(*args, **kwargs)
def query(self, *args, **kwargs):
return self.exec(*args, **kwargs)
return self.execute(*args, **kwargs)
def exec(self, *args, **kwargs):
"""Execute SQL query and return output if available."""
self.cursor.execute(*args, **kwargs)
self._fetchall = self.cursor.fetchall()
if self._fetchall is not None:
if self.cursor.description is not None:
self._field_names = [i[0] for i in self.cursor.description]
return self._conv(self._field_names, self._fetchall)
def commit(self):
"""Commit changes into database."""
def execute(self, *args, **kwargs) -> Union[List[dict], None]:
"""Execute SQL query and return list of dicts or None."""
if self._provider == Provider.MYSQL:
self._connection.commit()
self._cursor.execute(*args, **kwargs)
logger.debug(f'MySQLdb ran: {self._cursor._executed}')
self._fetchall = self._cursor.fetchall()
if self._provider == Provider.POSTGRES:
pgProgrammingError = _import('psycopg2', 'ProgrammingError')
self._cursor.execute(*args, **kwargs)
logger.debug(f'psycopg2 ran: {self._cursor.query}')
try:
self._fetchall = self._cursor.fetchall()
except pgProgrammingError as e:
self._fetchall = None
if self._fetchall is not None:
self._colnames = []
if self._cursor.description is not None:
self._colnames = [
desc[0] for desc in self._cursor.description
]
return make_list(self._colnames, self._fetchall)
return None
def _conv(self, field_names, rows) -> list[dict]:
data = []
for row in rows:
item = {}
for i in range(len(row)):
item[field_names[i]] = row[i]
data.append(item)
return data
def commit(self) -> None:
"""Commit changes into database."""
self._connection.commit()
logger.debug('Changes commited into database')
@property
def connection(self):
return self._connection
@property
def cursor(self):
return self._cursor