######################################################################################################################
import hashlib
import os
import time
import logging
from functools import partialmethod
from datetime import datetime, timezone
from types import MethodType
from sqlalchemy import create_engine, MetaData, inspect
from sqlalchemy.pool import NullPool
from sqlalchemy.event import listen
from sqlalchemy.exc import DatabaseError, DBAPIError, ArgumentError
from sqlalchemy.engine.url import make_url, URL
from alembic.migration import MigrationContext
from alembic.environment import EnvironmentContext
from alembic.script import ScriptDirectory
from alembic.config import Config
from alembic.util.exc import CommandError
from .filters.tools import pop_filter_configs, apply_filter_stack, load_filters
from .spine_db_client import get_db_url_from_server
from .mapped_items import item_factory
from .db_mapping_base import DatabaseMappingBase
from .db_mapping_commit_mixin import DatabaseMappingCommitMixin
from .db_mapping_query_mixin import DatabaseMappingQueryMixin
from .exception import SpineDBAPIError, SpineDBVersionError, SpineIntegrityError
from .query import Query
from .compatibility import compatibility_transformations
from .helpers import (
_create_first_spine_database,
create_new_spine_database_from_bind,
compare_schemas,
model_meta,
copy_database_bind,
Asterisk,
)
logging.getLogger("alembic").setLevel(logging.CRITICAL)
[docs]class DatabaseMapping(DatabaseMappingQueryMixin, DatabaseMappingCommitMixin, DatabaseMappingBase):
"""Enables communication with a Spine DB.
The DB is incrementally mapped into memory as data is requested/modified, following the :ref:`db_mapping_schema`.
Data is typically retrieved using :meth:`get_item` or :meth:`get_items`.
If the requested data is already in memory, it is returned from there;
otherwise it is fetched from the DB, stored in memory, and then returned.
In other words, the data is fetched from the DB exactly once.
For convenience, we also provide specialized 'get' methods for each item type, e.g., :meth:`get_entity_item`
and :meth:`get_entity_items`.
Data is added via :meth:`add_item`;
updated via :meth:`update_item`;
removed via :meth:`remove_item`;
and restored via :meth:`restore_item`.
All the above methods modify the in-memory mapping (not the DB itself).
These methods also fetch data from the DB into the in-memory mapping to perform the necessary integrity checks
(unique and foreign key constraints).
For convenience, we also provide specialized 'add', 'update', 'remove', and 'restore' methods
for each item type, e.g.,
:meth:`add_entity_item`, :meth:`update_entity_item`, :meth:`remove_entity_item`, :meth:`restore_entity_item`.
Modifications to the in-memory mapping are committed (written) to the DB via :meth:`commit_session`,
or rolled back (discarded) via :meth:`rollback_session`.
The DB fetch status is reset via :meth:`refresh_session`.
This allows new items in the DB (added by other clients in the meantime) to be retrieved as well.
You can also control the fetching process via :meth:`fetch_more` and/or :meth:`fetch_all`.
For example, you can call :meth:`fetch_more` in a dedicated thread while you do some work on the main thread.
This will nicely place items in the in-memory mapping so you can access them later, without
the overhead of fetching them from the DB.
The :meth:`query` method is also provided as an alternative way to retrieve data from the DB
while bypassing the in-memory mapping entirely.
You can use this class as a context manager, e.g.::
with DatabaseMapping(db_url) as db_map:
# Do stuff with db_map
...
"""
_sq_name_by_item_type = {
"alternative": "alternative_sq",
"scenario": "scenario_sq",
"scenario_alternative": "scenario_alternative_sq",
"entity_class": "wide_entity_class_sq",
"superclass_subclass": "superclass_subclass_sq",
"entity": "wide_entity_sq",
"entity_group": "entity_group_sq",
"entity_alternative": "entity_alternative_sq",
"parameter_value_list": "parameter_value_list_sq",
"list_value": "list_value_sq",
"parameter_definition": "parameter_definition_sq",
"parameter_value": "parameter_value_sq",
"metadata": "metadata_sq",
"entity_metadata": "entity_metadata_sq",
"parameter_value_metadata": "parameter_value_metadata_sq",
"commit": "commit_sq",
}
def __init__(
self,
db_url,
username=None,
upgrade=False,
backup_url="",
codename=None,
create=False,
apply_filters=True,
memory=False,
sqlite_timeout=1800,
):
"""
Args:
db_url (str or :class:`~sqlalchemy.engine.url.URL`): A URL in RFC-1738 format pointing to the database
to be mapped, or to a DB server.
username (str, optional): A user name. If not given, it gets replaced by the string `anon`.
upgrade (bool, optional): Whether the DB at the given `url` should be upgraded to the most recent
version.
backup_url (str, optional): A URL to backup the DB before upgrading.
codename (str, optional): A name to identify this object in your application.
create (bool, optional): Whether to create a new Spine DB at the given `url` if it's not already one.
apply_filters (bool, optional): Whether to apply filters in the `url`'s query segment.
memory (bool, optional): Whether to use a SQLite memory DB as replacement for the original one.
sqlite_timeout (int, optional): The number of seconds to wait before raising SQLite connection errors.
"""
super().__init__()
# FIXME: We should also check the server memory property and use it here
db_url = get_db_url_from_server(db_url)
self.db_url = str(db_url)
if isinstance(db_url, str):
filter_configs, db_url = pop_filter_configs(db_url)
elif isinstance(db_url, URL):
filter_configs = db_url.query.pop("spinedbfilter", [])
else:
filter_configs = []
self._filter_configs = filter_configs if apply_filters else None
try:
self.sa_url = make_url(db_url)
except ArgumentError:
raise SpineDBAPIError("Could not parse the given URL. Please check that it is valid.")
self.username = username if username else "anon"
self.codename = self._make_codename(codename)
self._memory = memory
self._memory_dirty = False
self._original_engine = self.create_engine(
self.sa_url, create=create, upgrade=upgrade, backup_url=backup_url, sqlite_timeout=sqlite_timeout
)
# NOTE: The NullPool is needed to receive the close event (or any events), for some reason
self.engine = create_engine("sqlite://", poolclass=NullPool) if self._memory else self._original_engine
listen(self.engine, "close", self._receive_engine_close)
if self._memory:
copy_database_bind(self.engine, self._original_engine)
self._metadata = MetaData(self.engine)
self._metadata.reflect()
self._tablenames = [t.name for t in self._metadata.sorted_tables]
if self._filter_configs is not None:
stack = load_filters(self._filter_configs)
apply_filter_stack(self, stack)
def __enter__(self):
return self
def __exit__(self, _exc_type, _exc_val, _exc_tb):
self.close()
def __del__(self):
self.close()
@staticmethod
def item_types():
return [x for x in DatabaseMapping._sq_name_by_item_type if not item_factory(x).is_protected]
@staticmethod
def all_item_types():
return list(DatabaseMapping._sq_name_by_item_type)
@staticmethod
def item_factory(item_type):
return item_factory(item_type)
def _query_commit_count(self):
return self.query(self.commit_sq).count()
def _make_sq(self, item_type):
sq_name = self._sq_name_by_item_type[item_type]
return getattr(self, sq_name)
def _make_codename(self, codename):
if codename:
return str(codename)
if not self.sa_url.drivername.startswith("sqlite"):
return self.sa_url.database
if self.sa_url.database is not None:
return os.path.splitext(os.path.basename(self.sa_url.database))[0]
hashing = hashlib.sha1()
hashing.update(bytes(str(time.time()), "utf-8"))
return hashing.hexdigest()
@staticmethod
[docs] def get_upgrade_db_prompt_data(url, create=False):
"""Returns data to prompt the user what to do if the DB at the given url is not the latest version.
If it is, then returns None.
Args:
url (str)
create (bool,optional)
Returns:
str: The title of the prompt
str: The text of the prompt
dict: Mapping different options, to kwargs to pass to DatabaseMapping constructor in order to apply them
dict or None: Mapping different options, to additional notes
int or None: The preferred option if any
"""
sa_url = make_url(url)
try:
DatabaseMapping.create_engine(sa_url, create=create)
return None
except SpineDBVersionError as v_err:
if v_err.upgrade_available:
title = "Incompatible database version"
text = (
f"The database at <br><center>'{sa_url}'</center><br> is at revision <b>{v_err.current}</b> "
f"and needs to be upgraded to revision <b>{v_err.expected}</b> "
"in order to be used with the current version of Spine."
"<p><b>WARNING</b>: After the upgrade, the database may no longer be used with previous versions."
)
if sa_url.drivername == "sqlite":
folder_name, file_name = os.path.split(sa_url.database)
file_name, _ = os.path.splitext(file_name)
else:
folder_name = os.path.expanduser("~")
file_name = sa_url.database
database = os.path.join(folder_name, file_name + "." + v_err.current)
backup_url = str(URL("sqlite", database=database))
option_to_kwargs = {
"Backup and upgrade": dict(upgrade=True, backup_url=backup_url),
"Just upgrade": dict(upgrade=True),
}
notes = {"Backup and upgrade": f"The backup will be written at '{backup_url}'"}
preferred = 0
else:
title = "Unsupported database version"
text = (
f"The database at <br><center>'{sa_url}'</center><br> is at revision <b>{v_err.current}</b> "
f"while this version of Spine supports revisions up to <b>{v_err.expected}</b>."
"<p>Please upgrade Spine to use this database."
)
option_to_kwargs = {}
notes = None
preferred = None
return title, text, option_to_kwargs, notes, preferred
@staticmethod
def create_engine(sa_url, create=False, upgrade=False, backup_url="", sqlite_timeout=1800):
if sa_url.drivername == "sqlite":
connect_args = {"timeout": sqlite_timeout}
else:
connect_args = {}
try:
engine = create_engine(sa_url, connect_args=connect_args)
with engine.connect():
pass
except Exception as e:
raise SpineDBAPIError(
f"Could not connect to '{sa_url}': {str(e)}. "
f"Please make sure that '{sa_url}' is a valid sqlalchemy URL."
) from None
with engine.begin() as connection:
if sa_url.drivername == "sqlite":
connection.execute("BEGIN IMMEDIATE")
# TODO: Do other dialects need to lock?
migration_context = MigrationContext.configure(connection)
try:
current = migration_context.get_current_revision()
except DatabaseError as error:
raise SpineDBAPIError(str(error)) from None
if current is None:
# No revision information. Check that the schema of the given url corresponds to a 'first' Spine db
# Otherwise we either raise or create a new Spine db at the url.
ref_engine = _create_first_spine_database("sqlite://")
if not compare_schemas(engine, ref_engine):
if not create or inspect(engine).get_table_names():
raise SpineDBAPIError(
"Unable to determine db revision. "
f"Please check that\n\n\t{sa_url}\n\nis the URL of a valid Spine db."
)
create_new_spine_database_from_bind(connection)
return engine
config = Config()
config.set_main_option("script_location", "spinedb_api:alembic")
script = ScriptDirectory.from_config(config)
head = script.get_current_head()
if current != head:
if not upgrade:
try:
script.get_revision(current) # Check if current revision is part of alembic rev. history
except CommandError:
# Can't find 'current' revision
raise SpineDBVersionError(
url=sa_url, current=current, expected=head, upgrade_available=False
) from None
raise SpineDBVersionError(url=sa_url, current=current, expected=head)
if backup_url:
dst_engine = create_engine(backup_url)
copy_database_bind(dst_engine, engine)
# Upgrade function
def upgrade_to_head(rev, context):
return script._upgrade_revs("head", rev)
with EnvironmentContext(
config,
script,
fn=upgrade_to_head,
as_sql=False,
starting_rev=None,
destination_rev="head",
tag=None,
) as environment_context:
environment_context.configure(connection=connection, target_metadata=model_meta)
with environment_context.begin_transaction():
environment_context.run_migrations()
return engine
def _receive_engine_close(self, dbapi_con, _connection_record):
if self._memory_dirty:
copy_database_bind(self._original_engine, self.engine)
@staticmethod
def real_item_type(tablename):
return {
"object_class": "entity_class",
"relationship_class": "entity_class",
"object": "entity",
"relationship": "entity",
}.get(tablename, tablename)
@staticmethod
def _convert_legacy(tablename, item):
if tablename in ("entity_class", "entity"):
object_class_id_list = tuple(item.pop("object_class_id_list", ()))
if object_class_id_list:
item["dimension_id_list"] = object_class_id_list
object_class_name_list = tuple(item.pop("object_class_name_list", ()))
if object_class_name_list:
item["dimension_name_list"] = object_class_name_list
if tablename == "entity":
object_id_list = tuple(item.pop("object_id_list", ()))
if object_id_list:
item["element_id_list"] = object_id_list
object_name_list = tuple(item.pop("object_name_list", ()))
if object_name_list:
item["element_name_list"] = object_name_list
if tablename in ("parameter_definition", "parameter_value"):
entity_class_id = item.pop("object_class_id", None) or item.pop("relationship_class_id", None)
if entity_class_id:
item["entity_class_id"] = entity_class_id
if tablename == "parameter_value":
entity_id = item.pop("object_id", None) or item.pop("relationship_id", None)
if entity_id:
item["entity_id"] = entity_id
def get_import_alternative_name(self):
if self._import_alternative_name is None:
self._create_import_alternative()
return self._import_alternative_name
def _create_import_alternative(self):
"""Creates the alternative to be used as default for all import operations."""
self._import_alternative_name = "Base"
def override_create_import_alternative(self, method):
self._create_import_alternative = MethodType(method, self)
self._import_alternative_name = None
def get_table(self, tablename):
# For tests
return self._metadata.tables[tablename]
[docs] def get_item(self, item_type, fetch=True, skip_removed=True, **kwargs):
"""Finds and returns an item matching the arguments, or None if none found.
Example::
with DatabaseMapping(db_url) as db_map:
prince = db_map.get_item("entity", entity_class_name="musician", name="Prince")
Args:
item_type (str): One of <spine_item_types>.
fetch (bool, optional): Whether to fetch the DB in case the item is not found in memory.
skip_removed (bool, optional): Whether to ignore removed items.
**kwargs: Fields and values for one the unique keys as specified for the item type
in :ref:`db_mapping_schema`.
Returns:
:class:`PublicItem` or None
"""
item_type = self.real_item_type(item_type)
mapped_table = self.mapped_table(item_type)
mapped_table.check_fields(kwargs, valid_types=(type(None),))
item = mapped_table.find_item(kwargs)
if not item and fetch:
self.do_fetch_more(item_type, offset=0, limit=None, **kwargs)
item = mapped_table.find_item(kwargs)
if not item or (skip_removed and not item.is_valid()):
return {}
return item.public_item
[docs] def get_items(self, item_type, fetch=True, skip_removed=True, **kwargs):
"""Finds and returns all the items of one type.
Args:
item_type (str): One of <spine_item_types>.
fetch (bool, optional): Whether to fetch the DB before returning the items.
skip_removed (bool, optional): Whether to ignore removed items.
**kwargs: Fields and values for one the unique keys as specified for the item type
in :ref:`db_mapping_schema`.
Returns:
list(:class:`PublicItem`): The items.
"""
item_type = self.real_item_type(item_type)
mapped_table = self.mapped_table(item_type)
mapped_table.check_fields(kwargs, valid_types=(type(None),))
if fetch:
self.do_fetch_more(item_type, offset=0, limit=None, **kwargs)
get_items = mapped_table.valid_values if skip_removed else mapped_table.values
return [x.public_item for x in get_items() if all(x.get(k) == v for k, v in kwargs.items())]
@staticmethod
def _modify_items(function, *items, strict=False):
modified, errors = [], []
for item in items:
item, error = function(item)
if error:
if strict:
raise SpineIntegrityError(error)
errors.append(error)
if item:
modified.append(item)
return modified, errors
[docs] def add_item(self, item_type, check=True, **kwargs):
"""Adds an item to the in-memory mapping.
Example::
with DatabaseMapping(db_url) as db_map:
db_map.add_item("entity_class", name="musician")
db_map.add_item("entity", entity_class_name="musician", name="Prince")
Args:
item_type (str): One of <spine_item_types>.
**kwargs: Fields and values as specified for the item type in :ref:`db_mapping_schema`.
Returns:
tuple(:class:`PublicItem` or None, str): The added item and any errors.
"""
item_type = self.real_item_type(item_type)
mapped_table = self.mapped_table(item_type)
self._convert_legacy(item_type, kwargs)
if not check:
return mapped_table.add_item(kwargs), None
checked_item, error = mapped_table.checked_item_and_error(kwargs)
return (mapped_table.add_item(checked_item).public_item if checked_item else None, error)
[docs] def add_items(self, item_type, *items, check=True, strict=False):
"""Adds many items to the in-memory mapping.
Args:
item_type (str): One of <spine_item_types>.
*items (Iterable(dict)): One or more :class:`dict` objects mapping fields to values of the item type,
as specified in :ref:`db_mapping_schema`.
strict (bool): Whether or not the method should raise :exc:`~.exception.SpineIntegrityError`
if the insertion of one of the items violates an integrity constraint.
Returns:
tuple(list(:class:`PublicItem`),list(str)): items successfully added and found violations.
"""
return self._modify_items(lambda x: self.add_item(item_type, check=check, **x), *items, strict=strict)
[docs] def update_item(self, item_type, check=True, **kwargs):
"""Updates an item in the in-memory mapping.
Example::
with DatabaseMapping(db_url) as db_map:
prince = db_map.get_item("entity", entity_class_name="musician", name="Prince")
db_map.update_item(
"entity", id=prince["id"], name="the Artist", description="Formerly known as Prince."
)
Args:
item_type (str): One of <spine_item_types>.
id (int): The id of the item to update.
**kwargs: Fields to update and their new values as specified for the item type in :ref:`db_mapping_schema`.
Returns:
tuple(:class:`PublicItem` or None, str): The updated item and any errors.
"""
item_type = self.real_item_type(item_type)
mapped_table = self.mapped_table(item_type)
self._convert_legacy(item_type, kwargs)
if not check:
return mapped_table.update_item(kwargs), None
checked_item, error = mapped_table.checked_item_and_error(kwargs, for_update=True)
return (mapped_table.update_item(checked_item._asdict()).public_item if checked_item else None, error)
[docs] def update_items(self, item_type, *items, check=True, strict=False):
"""Updates many items in the in-memory mapping.
Args:
item_type (str): One of <spine_item_types>.
*items (Iterable(dict)): One or more :class:`dict` objects mapping fields to values of the item type,
as specified in :ref:`db_mapping_schema` and including the `id`.
strict (bool): Whether or not the method should raise :exc:`~.exception.SpineIntegrityError`
if the update of one of the items violates an integrity constraint.
Returns:
tuple(list(:class:`PublicItem`),list(str)): items successfully updated and found violations.
"""
return self._modify_items(lambda x: self.update_item(item_type, check=check, **x), *items, strict=strict)
[docs] def add_update_item(self, item_type, check=True, **kwargs):
"""Adds an item to the in-memory mapping if it doesn't exist; otherwise updates the current one.
Args:
item_type (str): One of <spine_item_types>.
**kwargs: Fields and values as specified for the item type in :ref:`db_mapping_schema`.
Returns:
tuple(:class:`PublicItem` or None, :class:`PublicItem` or None, str): The added item if any,
the updated item if any, and any errors.
"""
added, add_error = self.add_item(item_type, check=check, **kwargs)
if not add_error:
return added, None, add_error
updated, update_error = self.update_item(item_type, check=check, **kwargs)
if not update_error:
return None, updated, update_error
return None, None, add_error or update_error
[docs] def add_update_items(self, item_type, *items, check=True, strict=False):
"""Adds or updates many items into the in-memory mapping.
Args:
item_type (str): One of <spine_item_types>.
*items (Iterable(dict)): One or more :class:`dict` objects mapping fields to values of the item type,
as specified in :ref:`db_mapping_schema`.
strict (bool): Whether or not the method should raise :exc:`~.exception.SpineIntegrityError`
if the insertion of one of the items violates an integrity constraint.
Returns:
tuple(list(:class:`PublicItem`),list(:class:`PublicItem`),list(str)): items successfully added,
items successfully updated, and found violations.
"""
def _function(item):
added, updated, error = self.add_update_item(item_type, check=check, **item)
return (added, updated), error
added_updated, errors = self._modify_items(_function, *items, strict=strict)
added, updated = zip(*added_updated) if added_updated else ([], [])
added = [x for x in added if x]
updated = [x for x in updated if x]
return added, updated, errors
[docs] def remove_item(self, item_type, id_, check=True):
"""Removes an item from the in-memory mapping.
Example::
with DatabaseMapping(db_url) as db_map:
prince = db_map.get_item("entity", entity_class_name="musician", name="Prince")
db_map.remove_item("entity", prince["id"])
Args:
item_type (str): One of <spine_item_types>.
id_ (int): The id of the item to remove.
Returns:
tuple(:class:`PublicItem` or None, str): The removed item and any errors.
"""
item_type = self.real_item_type(item_type)
mapped_table = self.mapped_table(item_type)
item, error = mapped_table.item_to_remove_and_error(id_)
if check and error:
return None, error
return mapped_table.remove_item(item).public_item, None
[docs] def remove_items(self, item_type, *ids, check=True, strict=False):
"""Removes many items from the in-memory mapping.
Args:
item_type (str): One of <spine_item_types>.
*ids (Iterable(int)): Ids of items to be removed.
strict (bool): Whether or not the method should raise :exc:`~.exception.SpineIntegrityError`
if the update of one of the items violates an integrity constraint.
Returns:
tuple(list(:class:`PublicItem`),list(str)): items successfully removed and found violations.
"""
item_type = self.real_item_type(item_type)
ids = set(ids)
if item_type == "alternative":
# Do not remove the Base alternative
ids.discard(1)
if not ids:
return [], []
return self._modify_items(lambda x: self.remove_item(item_type, x, check=check), *ids, strict=strict)
def cascade_remove_items(self, cache=None, **kwargs):
# Legacy
for item_type, ids in kwargs.items():
self.remove_items(item_type, *ids)
[docs] def restore_item(self, item_type, id_):
"""Restores a previously removed item into the in-memory mapping.
Example::
with DatabaseMapping(db_url) as db_map:
prince = db_map.get_item("entity", skip_remove=False, entity_class_name="musician", name="Prince")
db_map.restore_item("entity", prince["id"])
Args:
item_type (str): One of <spine_item_types>.
id_ (int): The id of the item to restore.
Returns:
tuple(:class:`PublicItem` or None, str): The restored item if any.
"""
item_type = self.real_item_type(item_type)
mapped_table = self.mapped_table(item_type)
return mapped_table.restore_item(id_).public_item
[docs] def restore_items(self, item_type, *ids):
"""Restores many previously removed items into the in-memory mapping.
Args:
item_type (str): One of <spine_item_types>.
*ids (Iterable(int)): Ids of items to be removed.
Returns:
list(:class:`PublicItem`): the restored items.
"""
if not ids:
return []
return [self.restore_item(item_type, id_) for id_ in ids]
[docs] def purge_items(self, item_type):
"""Removes all items of one type.
Args:
item_type (str): One of <spine_item_types>.
Returns:
bool: True if any data was removed, False otherwise.
"""
return bool(self.remove_items(item_type, Asterisk))
[docs] def fetch_more(self, item_type, offset=0, limit=None, **kwargs):
"""Fetches items from the DB into the in-memory mapping, incrementally.
Args:
item_type (str): One of <spine_item_types>.
offset (int): The initial row.
limit (int): The maximum number of rows to fetch.
**kwargs: Fields and values for one the unique keys as specified for the item type
in :ref:`db_mapping_schema`.
Returns:
list(:class:`PublicItem`): The items fetched.
"""
item_type = self.real_item_type(item_type)
return [x.public_item for x in self.do_fetch_more(item_type, offset=offset, limit=limit, **kwargs)]
[docs] def fetch_all(self, *item_types):
"""Fetches items from the DB into the in-memory mapping.
Unlike :meth:`fetch_more`, this method fetches entire tables.
Args:
*item_types (Iterable(str)): One or more of <spine_item_types>.
If none given, then the entire DB is fetched.
"""
item_types = set(self.item_types()) if not item_types else set(item_types) & set(self.item_types())
for item_type in item_types:
item_type = self.real_item_type(item_type)
self.do_fetch_more(item_type)
[docs] def query(self, *args, **kwargs):
"""Returns a :class:`~spinedb_api.query.Query` object to execute against the mapped DB.
To perform custom ``SELECT`` statements, call this method with one or more of the documented
subquery properties of :class:`~spinedb_api.db_mapping_query_mixin.DatabaseMappingQueryMixin` returning
:class:`~sqlalchemy.sql.expression.Alias` objetcs.
For example, to select the entity class with ``id`` equal to 1::
from spinedb_api import DatabaseMapping
url = 'sqlite:///spine.db'
...
db_map = DatabaseMapping(url)
db_map.query(db_map.entity_class_sq).filter_by(id=1).one_or_none()
To perform more complex queries, just use the :class:`~spinedb_api.query.Query` interface
(which is a close clone of SQL Alchemy's :class:`~sqlalchemy.orm.query.Query`).
For example, to select all entity class names and the names of their entities concatenated in a comma-separated
string::
from sqlalchemy import func
db_map.query(
db_map.entity_class_sq.c.name, func.group_concat(db_map.entity_sq.c.name)
).filter(
db_map.entity_sq.c.class_id == db_map.entity_class_sq.c.id
).group_by(db_map.entity_class_sq.c.name).all()
Returns:
:class:`~spinedb_api.query.Query`: The resulting query.
"""
return Query(self.engine, *args)
[docs] def commit_session(self, comment, apply_compatibility_transforms=True):
"""Commits the changes from the in-memory mapping to the database.
Args:
comment (str): commit message
apply_compatibility_transforms (bool): if True, apply compatibility transforms
Returns:
tuple(list, list): compatibility transformations
"""
if not comment:
raise SpineDBAPIError("Commit message cannot be empty.")
with self.engine.begin() as connection:
commit = self._metadata.tables["commit"]
commit_item = dict(user=self.username, date=datetime.now(timezone.utc), comment=comment)
try:
# TODO: The below locks the DB in sqlite, how about other dialects?
commit_id = connection.execute(commit.insert(), commit_item).inserted_primary_key[0]
except DBAPIError as e:
raise SpineDBAPIError(f"Fail to commit: {e.orig.args}") from e
dirty_items = self._dirty_items()
if not dirty_items:
connection.execute(commit.delete().where(commit.c.id == commit_id))
raise SpineDBAPIError("Nothing to commit.")
for tablename, (to_add, to_update, to_remove) in dirty_items:
for item in to_add + to_update + to_remove:
item.commit(commit_id)
# Remove before add, to help with keeping integrity constraints
self._do_remove_items(connection, tablename, *{x["id"] for x in to_remove})
self._do_update_items(connection, tablename, *to_update)
self._do_add_items(connection, tablename, *to_add)
if self._memory:
self._memory_dirty = True
transformation_info = compatibility_transformations(connection, apply=apply_compatibility_transforms)
self._commit_count = self._query_commit_count()
return transformation_info
[docs] def rollback_session(self):
"""Discards all the changes from the in-memory mapping."""
if not self._rollback():
raise SpineDBAPIError("Nothing to rollback.")
if self._memory:
self._memory_dirty = False
[docs] def refresh_session(self):
"""Resets the fetch status so new items from the DB can be retrieved."""
self._refresh()
[docs] def has_external_commits(self):
"""Tests whether the database has had commits from other sources than this mapping.
Returns:
bool: True if database has external commits, False otherwise
"""
return self._commit_count != self._query_commit_count()
[docs] def close(self):
"""Closes this DB mapping. This is only needed if you're keeping a long-lived session.
For instance::
class MyDBMappingWrapper:
def __init__(self, url):
self._db_map = DatabaseMapping(url)
# More methods that do stuff with self._db_map
def __del__(self):
self._db_map.close()
Otherwise, the usage as context manager is recommended::
with DatabaseMapping(url) as db_map:
# Do stuff with db_map
...
# db_map.close() is automatically called when leaving this block
"""
self.closed = True
def add_ext_entity_metadata(self, *items, **kwargs):
metadata_items = self.get_metadata_to_add_with_item_metadata_items(*items)
self.add_items("metadata", *metadata_items, **kwargs)
return self.add_items("entity_metadata", *items, **kwargs)
def add_ext_parameter_value_metadata(self, *items, **kwargs):
metadata_items = self.get_metadata_to_add_with_item_metadata_items(*items)
self.add_items("metadata", *metadata_items, **kwargs)
return self.add_items("parameter_value_metadata", *items, **kwargs)
def get_metadata_to_add_with_item_metadata_items(self, *items):
metadata_items = ({"name": item["metadata_name"], "value": item["metadata_value"]} for item in items)
return [x for x in metadata_items if not self.mapped_table("metadata").find_item(x)]
def _update_ext_item_metadata(self, tablename, *items, **kwargs):
metadata_items = self.get_metadata_to_add_with_item_metadata_items(*items)
added, errors = self.add_items("metadata", *metadata_items, **kwargs)
updated, more_errors = self.update_items(tablename, *items, **kwargs)
return added + updated, errors + more_errors
def update_ext_entity_metadata(self, *items, **kwargs):
return self._update_ext_item_metadata("entity_metadata", *items, **kwargs)
def update_ext_parameter_value_metadata(self, *items, **kwargs):
return self._update_ext_item_metadata("parameter_value_metadata", *items, **kwargs)
def remove_unused_metadata(self):
used_metadata_ids = set()
for x in self.mapped_table("entity_metadata").valid_values():
used_metadata_ids.add(x["metadata_id"])
for x in self.mapped_table("parameter_value_metadata").valid_values():
used_metadata_ids.add(x["metadata_id"])
unused_metadata_ids = {x["id"] for x in self.mapped_table("metadata").valid_values()} - used_metadata_ids
self.remove_items("metadata", *unused_metadata_ids)
[docs] def get_filter_configs(self):
"""Returns the filters from this mapping's URL.
Returns:
list(dict):
"""
return self._filter_configs
# Define convenience methods
for it in DatabaseMapping.item_types():
setattr(DatabaseMapping, "get_" + it + "_item", partialmethod(DatabaseMapping.get_item, it))
setattr(DatabaseMapping, "get_" + it + "_items", partialmethod(DatabaseMapping.get_items, it))
setattr(DatabaseMapping, "add_" + it + "_item", partialmethod(DatabaseMapping.add_item, it))
setattr(DatabaseMapping, "update_" + it + "_item", partialmethod(DatabaseMapping.update_item, it))
setattr(DatabaseMapping, "add_update_" + it + "_item", partialmethod(DatabaseMapping.add_update_item, it))
setattr(DatabaseMapping, "remove_" + it + "_item", partialmethod(DatabaseMapping.remove_item, it))
setattr(DatabaseMapping, "restore_" + it + "_item", partialmethod(DatabaseMapping.restore_item, it))
# Astroid transform so DatabaseMapping looks like it has the convenience methods defined above
def _add_convenience_methods(node):
if node.name != "DatabaseMapping":
return node
def _a(item_type):
return "an" if any(item_type.lower().startswith(x) for x in "aeiou") else "a"
def _uq_fields(factory):
return {
f_name: factory.fields[f_name]
for f_names in factory._unique_keys
for f_name in set(f_names) & set(factory.fields.keys())
}
def _kwargs(fields):
def type_(f_dict):
return f_dict["type"].__name__ + (", optional" if f_dict.get("optional", False) else "")
return f"\n{padding}".join(
[f"{f_name} ({type_(f_dict)}): {f_dict['value']}" for f_name, f_dict in fields.items()]
)
padding = 20 * " "
for item_type in DatabaseMapping.item_types():
factory = DatabaseMapping.item_factory(item_type)
a = _a(item_type)
get_kwargs = _kwargs(_uq_fields(factory))
child = astroid.extract_node(
f'''
def get_{item_type}_item(self, fetch=True, skip_removed=True, **kwargs):
"""Finds and returns {a} `{item_type}` item matching the arguments, or None if none found.
Args:
fetch (bool, optional): Whether to fetch the DB in case the item is not found in memory.
skip_removed (bool, optional): Whether to ignore removed items.
{get_kwargs}
Returns:
:class:`PublicItem` or None
"""
'''
)
child.parent = node
node.body.append(child)
for item_type in DatabaseMapping.item_types():
factory = DatabaseMapping.item_factory(item_type)
a = _a(item_type)
get_kwargs = _kwargs(_uq_fields(factory))
child = astroid.extract_node(
f'''
def get_{item_type}_items(self, fetch=True, skip_removed=True, **kwargs):
"""Finds and returns all {item_type} items.
Args:
fetch (bool, optional): Whether to fetch the DB before returning the items.
skip_removed (bool, optional): Whether to ignore removed items.
{get_kwargs}
Returns:
list(:class:`PublicItem`): The items.
"""
'''
)
child.parent = node
node.body.append(child)
for item_type in DatabaseMapping.item_types():
factory = DatabaseMapping.item_factory(item_type)
a = _a(item_type)
add_kwargs = _kwargs(factory.fields)
child = astroid.extract_node(
f'''
def add_{item_type}_item(self, check=True, **kwargs):
"""Adds {a} `{item_type}` item to the in-memory mapping.
Args:
{add_kwargs}
Returns:
tuple(:class:`PublicItem` or None, str): The added item and any errors.
"""
'''
)
child.parent = node
node.body.append(child)
for item_type in DatabaseMapping.item_types():
factory = DatabaseMapping.item_factory(item_type)
a = _a(item_type)
update_kwargs = f"id (int): The id of the item to update.\n{padding}" + _kwargs(factory.fields)
child = astroid.extract_node(
f'''
def update_{item_type}_item(self, check=True, **kwargs):
"""Updates {a} `{item_type}` item in the in-memory mapping.
Args:
{update_kwargs}
Returns:
tuple(:class:`PublicItem` or None, str): The updated item and any errors.
"""
'''
)
child.parent = node
node.body.append(child)
for item_type in DatabaseMapping.item_types():
factory = DatabaseMapping.item_factory(item_type)
a = _a(item_type)
add_kwargs = _kwargs(factory.fields)
child = astroid.extract_node(
f'''
def add_update_{item_type}_item(self, check=True, **kwargs):
"""Adds {a} `{item_type}` item to the in-memory mapping if it doesn't exist;
otherwise updates the current one.
Args:
{add_kwargs}
Returns:
tuple(:class:`PublicItem` or None, :class:`PublicItem` or None, str): The added item if any,
the updated item if any, and any errors.
"""
'''
)
child.parent = node
node.body.append(child)
for item_type in DatabaseMapping.item_types():
child = astroid.extract_node(
f'''
def remove_{item_type}_item(self, id):
"""Removes {a} `{item_type}` item from the in-memory mapping.
Args:
id (int): the id of the item to remove.
Returns:
tuple(:class:`PublicItem` or None, str): The removed item if any.
"""
'''
)
child.parent = node
node.body.append(child)
for item_type in DatabaseMapping.item_types():
child = astroid.extract_node(
f'''
def restore_{item_type}_item(self, id):
"""Restores a previously removed `{item_type}` item into the in-memory mapping.
Args:
id (int): the id of the item to restore.
Returns:
tuple(:class:`PublicItem` or None, str): The restored item if any.
"""
'''
)
child.parent = node
node.body.append(child)
return node
try:
import astroid
astroid.MANAGER.register_transform(astroid.ClassDef, _add_convenience_methods)
except ModuleNotFoundError:
pass