"""
This module provides tools to create/drop and use databases specified by DB URL. This module only
handles SQLite and PostgreSQL, but abstracts the differences between the two.
"""
from typing import Optional
import logging
import pathlib
import sqlite3
import contextlib
import subprocess
import urllib.parse
from clldutils.path import ensure_cmd
__all__ = ['DB', 'TempDB', 'FreshDB']
CREATEDB = 'createdb'
DROPDB = 'dropdb'
PSQL = 'psql'
[docs]class DB:
"""
A relational database specified by DB URL. Supported dialects are "sqlite" and "postgresql".
.. seealso::
`<https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine>`_
"""
settings_key = 'sqlalchemy.url'
def __init__(self, url: str, log: Optional[logging.Logger] = None):
self.log = log
self.components = urllib.parse.urlparse(url)
if self.dialect not in ['sqlite', 'postgresql']:
raise NotImplementedError(self.dialect)
if self.dialect == 'postgresql':
assert all(bool(ensure_cmd(cmd)) for cmd in [CREATEDB, DROPDB, PSQL])
def __str__(self):
return self.components.geturl()
[docs] @classmethod
def from_settings(cls, settings: dict, log=None):
"""
Instantiate a DB looking up the URL in a `dict`.
:param settings: A `dict` as returned - e.g. - by `pyramid.paster.get_appsettings`.
"""
return cls(settings[cls.settings_key], log=log)
@property
def dialect(self) -> str:
"""The database dialect"""
return str(self.components.scheme.split('+')[0])
@property
def name(self) -> str:
"""The database name."""
assert self.components.path.startswith('/')
return self.components.path[1:].split('?')[0]
[docs] def exists(self) -> bool:
"""
Does the database exist?
"""
if self.dialect == 'postgresql':
dbs = [
line.split('|')[0].strip() for line in
subprocess.check_output([PSQL, '-lqt']).decode('utf8').splitlines()]
return self.name in dbs
return pathlib.Path(self.name).exists()
[docs] def create(self):
"""
Create the database
:raises ValueError: If the database already exists.
"""
if self.log:
self.log.info(f'creating {self}')
if self.dialect == 'postgresql':
subprocess.check_call([CREATEDB, self.name])
else: # self.dialect == 'sqlite'
if self.exists():
raise ValueError('db exists!')
with contextlib.closing(sqlite3.connect(self.name)):
pass
[docs] def drop(self):
"""
Drop the database or remove the db file for sqlite.
"""
if self.exists():
if self.log:
self.log.info(f'dropping {self}')
if self.dialect == 'postgresql':
subprocess.check_call([DROPDB, self.name])
else:
pathlib.Path(self.name).unlink()
[docs]class FreshDB(DB):
"""
Context manager to use a newly created database.
.. code-block:: python
>>> with FreshDB(url) as db:
... assert db.exists()
"""
def __enter__(self):
self.drop()
self.create()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
[docs]class TempDB(DB):
"""
Context manager to use a temporary database.
.. code-block::
>>> with TempDB(url) as db:
... assert db.exists()
>>> assert not db.exists()
"""
def __enter__(self):
assert not self.exists()
self.create()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.drop()