######################################################################################################################
# Copyright (C) 2017-2022 Spine project consortium
# Copyright Spine Database API contributors
# This file is part of Spine Database API.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""
Functions for importing data into a Spine database in a standard format.
This functionaly is equivalent to the one provided by :meth:`.DatabaseMapping.add_update_item`,
but the syntax is a little more compact.
"""
from collections import defaultdict
from .parameter_value import to_database, fix_conflict
from .helpers import _parse_metadata
[docs]def import_data(db_map, unparse_value=to_database, on_conflict="merge", **kwargs):
"""Imports data into a Spine database using a standard format.
Example::
entity_classes = [
('example_class', ()), ('other_class', ()), ('multi_d_class', ('example_class', 'other_class'))
]
alternatives = [('example_alternative', 'An example')]
scenarios = [('example_scenario', 'An example')]
scenario_alternatives = [
('example_scenario', 'example_alternative'), ('example_scenario', 'Base', 'example_alternative')
]
parameter_value_lists = [("example_list", "value1"), ("example_list", "value2")]
parameter_definitions = [('example_class', 'example_parameter'), ('multi_d_class', 'other_parameter')]
entities = [
('example_class', 'example_entity'),
('example_class', 'example_group'),
('example_class', 'example_member'),
('other_class', 'other_entity'),
('multi_d_class', ('example_entity', 'other_entity')),
]
entity_groups = [
('example_class', 'example_group', 'example_member'),
('example_class', 'example_group', 'example_entity'),
]
parameter_values = [
('example_object_class', 'example_entity', 'example_parameter', 3.14),
('multi_d_class', ('example_entity', 'other_entity'), 'rel_parameter', 2.718),
]
entity_alternatives = [
('example_class', 'example_entity', "example_alternative", True),
('example_class', 'example_entity', "example_alternative", False),
]
import_data(
db_map,
entity_classes=entity_classes,
alternatives=alternatives,
scenarios=scenarios,
scenario_alternatives=scenario_alternatives,
parameter_value_lists=parameter_value_lists,
parameter_definitions=parameter_definitions,
entities=entities,
entity_groups=entity_groups,
parameter_values=parameter_values,
entity_alternatives=entity_alternatives,
)
Args:
db_map (spinedb_api.DiffDatabaseMapping): database mapping
on_conflict (str): Conflict resolution strategy for :func:`parameter_value.fix_conflict`
entity_classes (list(tuple(str,tuple,str,int)): tuples of
(name, dimension name tuple, description, display icon integer)
parameter_definitions (list(tuple(str,str,str,str)):
tuples of (class name, parameter name, default value, parameter value list name, description)
entities: (list(tuple(str,str or tuple(str)): tuples of (class name, entity name or element name list)
entity_alternatives: (list(tuple(str,str or tuple(str),str,bool): tuples of
(class name, entity name or element name list, alternative name, activity)
entity_groups (list(tuple(str,str,str))): tuples of (class name, group entity name, member entity name)
parameter_values (list(tuple(str,str or tuple(str),str,str|numeric,str]):
tuples of (class name, entity name or element name list, parameter name, value, alternative name)
alternatives (list(str,str)): tuples of (name, description)
scenarios (list(str,str)): tuples of (name, description)
scenario_alternatives (list(str,str,str)): tuples of
(scenario name, alternative name, preceeding alternative name)
parameter_value_lists (list(str,str|numeric)): tuples of (list name, value)
Returns:
int: number of items imported
list: errors
"""
all_errors = []
num_imports = 0
for item_type, items in get_data_for_import(db_map, unparse_value=unparse_value, on_conflict=on_conflict, **kwargs):
if isinstance(items, tuple):
items, input_errors = items
all_errors.extend(input_errors)
added, updated, errors = db_map.add_update_items(item_type, *items, strict=False)
num_imports += len(added + updated)
all_errors.extend(errors)
return num_imports, all_errors
[docs]def get_data_for_import(
db_map,
unparse_value=to_database,
on_conflict="merge",
entity_classes=(),
entities=(),
entity_groups=(),
entity_alternatives=(), # TODO
parameter_definitions=(),
parameter_values=(),
parameter_value_lists=(),
alternatives=(),
scenarios=(),
scenario_alternatives=(),
metadata=(),
entity_metadata=(),
parameter_value_metadata=(),
superclass_subclasses=(),
# legacy
object_classes=(),
relationship_classes=(),
object_parameters=(),
relationship_parameters=(),
objects=(),
relationships=(),
object_groups=(),
object_parameter_values=(),
relationship_parameter_values=(),
object_metadata=(),
relationship_metadata=(),
object_parameter_value_metadata=(),
relationship_parameter_value_metadata=(),
# removed
tools=(),
features=(),
tool_features=(),
tool_feature_methods=(),
):
"""Yields data to import into a Spine DB.
Args:
db_map (spinedb_api.DiffDatabaseMapping): database mapping
on_conflict (str): Conflict resolution strategy for :func:`~spinedb_api.parameter_value.fix_conflict`
entity_classes (list(tuple(str,tuple,str,int)): tuples of
(name, dimension name tuple, description, display icon integer)
parameter_definitions (list(tuple(str,str,str,str)):
tuples of (class name, parameter name, default value, parameter value list name)
entities: (list(tuple(str,str or tuple(str)): tuples of (class name, entity name or element name list)
entity_alternatives: (list(tuple(str,str or tuple(str),str,bool): tuples of
(class name, entity name or element name list, alternative name, activity)
entity_groups (list(tuple(str,str,str))): tuples of (class name, group entity name, member entity name)
parameter_values (list(tuple(str,str or tuple(str),str,str|numeric,str]):
tuples of (class name, entity name or element name list, parameter name, value, alternative name)
alternatives (list(str,str)): tuples of (name, description)
scenarios (list(str,str)): tuples of (name, description)
scenario_alternatives (list(str,str,str)): tuples of
(scenario name, alternative name, preceeding alternative name)
parameter_value_lists (list(str,str|numeric)): tuples of (list name, value)
Yields:
str: item type
tuple(list,list,list): tuple of (items to add, items to update, errors)
"""
# NOTE: The order is important, because of references. E.g., we want to import alternatives before parameter_values
if alternatives:
yield ("alternative", _get_alternatives_for_import(db_map, alternatives))
if scenarios:
yield ("scenario", _get_scenarios_for_import(db_map, scenarios))
if scenario_alternatives:
yield ("scenario_alternative", _get_scenario_alternatives_for_import(db_map, scenario_alternatives))
if entity_classes:
for bucket in _get_entity_classes_for_import(db_map, entity_classes):
yield ("entity_class", bucket)
if object_classes: # Legacy
yield from get_data_for_import(db_map, entity_classes=_object_classes_to_entity_classes(object_classes))
if relationship_classes: # Legacy
yield from get_data_for_import(db_map, entity_classes=relationship_classes)
if superclass_subclasses:
yield ("superclass_subclass", _get_superclass_subclasses_for_import(db_map, superclass_subclasses))
if entities:
for bucket in _get_entities_for_import(db_map, entities):
yield ("entity", bucket)
if objects: # Legacy
yield from get_data_for_import(db_map, entities=objects)
if relationships: # Legacy
yield from get_data_for_import(db_map, entities=relationships)
if entity_alternatives:
yield ("entity_alternative", _get_entity_alternatives_for_import(db_map, entity_alternatives))
if entity_groups:
yield ("entity_group", _get_entity_groups_for_import(db_map, entity_groups))
if object_groups: # Legacy
yield from get_data_for_import(db_map, entity_groups=object_groups)
if parameter_value_lists:
yield ("parameter_value_list", _get_parameter_value_lists_for_import(db_map, parameter_value_lists))
yield ("list_value", _get_list_values_for_import(db_map, parameter_value_lists, unparse_value))
if parameter_definitions:
yield (
"parameter_definition",
_get_parameter_definitions_for_import(db_map, parameter_definitions, unparse_value),
)
if object_parameters: # Legacy
yield from get_data_for_import(db_map, unparse_value=unparse_value, parameter_definitions=object_parameters)
if relationship_parameters: # Legacy
yield from get_data_for_import(
db_map, unparse_value=unparse_value, parameter_definitions=relationship_parameters
)
if parameter_values:
yield (
"parameter_value",
_get_parameter_values_for_import(db_map, parameter_values, unparse_value, on_conflict),
)
if object_parameter_values: # Legacy
yield from get_data_for_import(
db_map, unparse_value=unparse_value, on_conflict=on_conflict, parameter_values=object_parameter_values
)
if relationship_parameter_values: # Legacy
yield from get_data_for_import(
db_map, unparse_value=unparse_value, on_conflict=on_conflict, parameter_values=relationship_parameter_values
)
if metadata:
yield ("metadata", _get_metadata_for_import(db_map, metadata))
if entity_metadata:
yield ("metadata", _get_metadata_for_import(db_map, (ent_metadata[2] for ent_metadata in entity_metadata)))
yield ("entity_metadata", _get_entity_metadata_for_import(db_map, entity_metadata))
if parameter_value_metadata:
yield (
"metadata",
_get_metadata_for_import(db_map, (pval_metadata[3] for pval_metadata in parameter_value_metadata)),
)
yield ("parameter_value_metadata", _get_parameter_value_metadata_for_import(db_map, parameter_value_metadata))
if object_metadata: # Legacy
yield from get_data_for_import(db_map, entity_metadata=object_metadata)
if relationship_metadata: # Legacy
yield from get_data_for_import(db_map, entity_metadata=relationship_metadata)
if object_parameter_value_metadata: # Legacy
yield from get_data_for_import(db_map, parameter_value_metadata=object_parameter_value_metadata)
if relationship_parameter_value_metadata: # Legacy
yield from get_data_for_import(db_map, parameter_value_metadata=relationship_parameter_value_metadata)
[docs]def import_superclass_subclasses(db_map, data):
"""Imports superclass_subclasses into a Spine database using a standard format.
Args:
db_map (spinedb_api.DiffDatabaseMapping): database mapping
data (list(tuple(str,tuple,str,int)): tuples of (superclass name, subclass name)
Returns:
int: number of items imported
list: errors
"""
return import_data(db_map, superclass_subclasses=data)
[docs]def import_entity_classes(db_map, data):
"""Imports entity classes into a Spine database using a standard format.
Args:
db_map (spinedb_api.DiffDatabaseMapping): database mapping
data (list(tuple(str,tuple,str,int,bool)): tuples of
(name, dimension name tuple, description, display icon integer, active by default flag)
Returns:
int: number of items imported
list: errors
"""
return import_data(db_map, entity_classes=data)
[docs]def import_entities(db_map, data):
"""Imports entities into a Spine database using a standard format.
Args:
db_map (spinedb_api.DiffDatabaseMapping): database mapping
data: (list(tuple(str,str or tuple(str)): tuples of (class name, entity name or element name list)
Returns:
int: number of items imported
list: errors
"""
return import_data(db_map, entities=data)
[docs]def import_entity_alternatives(db_map, data):
"""Imports entity alternatives into a Spine database using a standard format.
Args:
db_map (spinedb_api.DiffDatabaseMapping): database mapping
data: (list(tuple(str,str or tuple(str),str,bool): tuples of
(class name, entity name or element name list, alternative name, activity)
Returns:
int: number of items imported
list: errors
"""
return import_data(db_map, entity_alternatives=data)
[docs]def import_entity_groups(db_map, data):
"""Imports entity groups into a Spine database using a standard format.
Args:
db_map (spinedb_api.DiffDatabaseMapping): database mapping
data (list(tuple(str,str,str))): tuples of (class name, group entity name, member entity name)
Returns:
int: number of items imported
list: errors
"""
return import_data(db_map, entity_groups=data)
[docs]def import_parameter_definitions(db_map, data, unparse_value=to_database):
"""Imports parameter definitions into a Spine database using a standard format.
Args:
db_map (spinedb_api.DiffDatabaseMapping): database mapping
data (list(tuple(str,str,str,str)):
tuples of (class name, parameter name, default value, parameter value list name)
Returns:
int: number of items imported
list: errors
"""
return import_data(db_map, parameter_definitions=data, unparse_value=unparse_value)
[docs]def import_parameter_values(db_map, data, unparse_value=to_database, on_conflict="merge"):
"""Imports parameter values into a Spine database using a standard format.
Args:
db_map (spinedb_api.DiffDatabaseMapping): database mapping
data (list(tuple(str,str or tuple(str),str,str|numeric,str]):
tuples of (class name, entity name or element name list, parameter name, value, alternative name)
on_conflict (str): Conflict resolution strategy for :func:`~spinedb_api.parameter_value.fix_conflict`
Returns:
int: number of items imported
list: errors
"""
return import_data(db_map, parameter_values=data, unparse_value=unparse_value, on_conflict=on_conflict)
[docs]def import_alternatives(db_map, data):
"""Imports alternatives into a Spine database using a standard format.
Args:
db_map (spinedb_api.DiffDatabaseMapping): database mapping
data (list(str,str)): tuples of (name, description)
Returns:
int: number of items imported
list: errors
"""
return import_data(db_map, alternatives=data)
[docs]def import_scenarios(db_map, data):
"""Imports scenarios into a Spine database using a standard format.
Args:
db_map (spinedb_api.DiffDatabaseMapping): database mapping
data (list(str, bool, str)): tuples of (name, <unused_bool>, description)
Returns:
int: number of items imported
list: errors
"""
return import_data(db_map, scenarios=data)
[docs]def import_scenario_alternatives(db_map, data):
"""Imports scenario alternatives into a Spine database using a standard format.
Args:
db_map (spinedb_api.DiffDatabaseMapping): database mapping
data (list(str,str,str)): tuples of (scenario name, alternative name, preceeding alternative name)
Returns:
int: number of items imported
list: errors
"""
return import_data(db_map, scenario_alternatives=data)
[docs]def import_parameter_value_lists(db_map, data, unparse_value=to_database):
"""Imports parameter value lists into a Spine database using a standard format.
Args:
db_map (spinedb_api.DiffDatabaseMapping): database mapping
data (list(str,str|numeric)): tuples of (list name, value)
Returns:
int: number of items imported
list: errors
"""
return import_data(db_map, parameter_value_lists=data, unparse_value=unparse_value)
def import_object_classes(db_map, data):
return import_data(db_map, object_classes=data)
def import_relationship_classes(db_map, data):
return import_data(db_map, relationship_classes=data)
def import_objects(db_map, data):
return import_data(db_map, objects=data)
def import_object_groups(db_map, data):
return import_data(db_map, object_groups=data)
def import_relationships(db_map, data):
return import_data(db_map, relationships=data)
def import_object_parameters(db_map, data, unparse_value=to_database):
return import_data(db_map, object_parameters=data, unparse_value=unparse_value)
def import_relationship_parameters(db_map, data, unparse_value=to_database):
return import_data(db_map, relationship_parameters=data, unparse_value=unparse_value)
def import_object_parameter_values(db_map, data, unparse_value=to_database, on_conflict="merge"):
return import_data(db_map, object_parameter_values=data, unparse_value=unparse_value, on_conflict=on_conflict)
def import_relationship_parameter_values(db_map, data, unparse_value=to_database, on_conflict="merge"):
return import_data(db_map, relationship_parameter_values=data, unparse_value=unparse_value, on_conflict=on_conflict)
def import_object_metadata(db_map, data):
return import_data(db_map, object_metadata=data)
def import_relationship_metadata(db_map, data):
return import_data(db_map, relationship_metadata=data)
def import_object_parameter_value_metadata(db_map, data):
return import_data(db_map, object_parameter_value_metadata=data)
def import_relationship_parameter_value_metadata(db_map, data):
return import_data(db_map, relationship_parameter_value_metadata=data)
def _get_entity_classes_for_import(db_map, data):
dim_name_list_by_name = {}
items = []
key = ("name", "dimension_name_list", "description", "display_icon", "active_by_default")
for x in data:
if isinstance(x, str):
x = x, ()
name, *optionals = x
dim_name_list = optionals.pop(0) if optionals else ()
item = dict(zip(key, (name, dim_name_list, *optionals)))
items.append(item)
dim_name_list_by_name[name] = dim_name_list
def _ref_count(name):
dim_name_list = dim_name_list_by_name.get(name, ())
return len(dim_name_list) + sum((_ref_count(dim_name) for dim_name in dim_name_list), start=0)
items_by_ref_count = {}
for item in items:
items_by_ref_count.setdefault(_ref_count(item["name"]), []).append(item)
return (items_by_ref_count[ref_count] for ref_count in sorted(items_by_ref_count))
def _get_superclass_subclasses_for_import(db_map, data):
key = ("superclass_name", "subclass_name")
return (dict(zip(key, x)) for x in data)
def _get_entities_for_import(db_map, data):
items_by_el_count = {}
key = ("entity_class_name", "entity_byname", "description")
for class_name, name_or_el_name_list, *optionals in data:
if isinstance(name_or_el_name_list, (list, tuple)):
el_count = len(name_or_el_name_list)
byname = name_or_el_name_list
else:
el_count = 0
byname = (name_or_el_name_list,)
item = dict(zip(key, (class_name, byname, *optionals)))
items_by_el_count.setdefault(el_count, []).append(item)
return (items_by_el_count[el_count] for el_count in sorted(items_by_el_count))
def _get_entity_alternatives_for_import(db_map, data):
for class_name, entity_name_or_element_name_list, alternative, active in data:
is_zero_dim = isinstance(entity_name_or_element_name_list, str)
entity_byname = (entity_name_or_element_name_list,) if is_zero_dim else entity_name_or_element_name_list
key = ("entity_class_name", "entity_byname", "alternative_name", "active")
yield dict(zip(key, (class_name, entity_byname, alternative, active)))
def _get_entity_groups_for_import(db_map, data):
key = ("entity_class_name", "group_name", "member_name")
return (dict(zip(key, x)) for x in data)
def _get_parameter_definitions_for_import(db_map, data, unparse_value):
key = ("entity_class_name", "name", "default_value", "default_type", "parameter_value_list_name", "description")
for class_name, parameter_name, *optionals in data:
if not optionals:
yield dict(zip(key, (class_name, parameter_name)))
continue
value = optionals.pop(0)
value, type_ = unparse_value(value)
yield dict(zip(key, (class_name, parameter_name, value, type_, *optionals)))
def _get_parameter_values_for_import(db_map, data, unparse_value, on_conflict):
seen = set()
errors = []
items = []
key = ("entity_class_name", "entity_byname", "parameter_definition_name", "alternative_name", "value", "type")
for class_name, entity_byname, parameter_name, value, *optionals in data:
if isinstance(entity_byname, str):
entity_byname = (entity_byname,)
else:
entity_byname = tuple(entity_byname)
alternative_name = optionals[0] if optionals else db_map.get_import_alternative_name()
unique_values = (class_name, entity_byname, parameter_name, alternative_name)
if unique_values in seen:
dupe = dict(zip(key, unique_values))
errors.append(
f"attempting to import more than one parameter_value with {dupe} - only first will be considered"
)
continue
seen.add(unique_values)
value, type_ = unparse_value(value)
item = dict(zip(key, unique_values + (None, None)))
pv = db_map.mapped_table("parameter_value").find_item(item)
if pv:
value, type_ = fix_conflict((value, type_), (pv["value"], pv["type"]), on_conflict)
item.update({"value": value, "type": type_})
items.append(item)
return items, errors
def _get_alternatives_for_import(db_map, data):
key = ("name", "description")
return ({"name": x} if isinstance(x, str) else dict(zip(key, x)) for x in data)
def _get_scenarios_for_import(db_map, data):
key = ("name", "active", "description")
return ({"name": x} if isinstance(x, str) else dict(zip(key, x)) for x in data)
def _get_scenario_alternatives_for_import(db_map, data):
# FIXME: maybe when updating, we only want to match by (scen_name, alt_name) and not by (scen_name, rank)
alt_name_list_by_scen_name = {}
succ_by_pred_by_scen_name = defaultdict(dict)
for scen_name, predecessor, *optionals in data:
successor = optionals[0] if optionals else None
succ_by_pred_by_scen_name[scen_name][predecessor] = successor
for scen_name, succ_by_pred in succ_by_pred_by_scen_name.items():
scen = db_map.mapped_table("scenario").find_item({"name": scen_name})
alternative_name_list = alt_name_list_by_scen_name[scen_name] = scen.get("alternative_name_list", [])
alternative_name_list.append(None) # So alternatives where successor is None find their place at the tail
while succ_by_pred:
some_added = False
for pred, succ in list(succ_by_pred.items()):
if succ in alternative_name_list:
if pred in alternative_name_list:
alternative_name_list.remove(pred)
i = alternative_name_list.index(succ)
alternative_name_list.insert(i, pred)
del succ_by_pred[pred]
some_added = True
if not some_added:
break
alternative_name_list.pop(-1) # Remove the None
items = (
{"scenario_name": scen_name, "alternative_name": alt_name, "rank": k + 1}
for scen_name, alternative_name_list in alt_name_list_by_scen_name.items()
for k, alt_name in enumerate(alternative_name_list)
)
errors = (
f"can't insert alternative '{pred}' before '{succ}' because the latter is not in scenario '{scen_name}'"
for scen, succ_by_pred in succ_by_pred_by_scen_name.items()
for pred, succ in succ_by_pred.items()
)
return items, errors
def _get_parameter_value_lists_for_import(db_map, data):
return ({"name": x} for x in {x[0]: None for x in data})
def _get_list_values_for_import(db_map, data, unparse_value):
index_by_list_name = {}
for list_name, value in data:
value, type_ = unparse_value(value)
index = index_by_list_name.get(list_name)
if index is None:
current_list = db_map.mapped_table("parameter_value_list").find_item({"name": list_name})
list_value_idx_by_val_typ = {
(x["value"], x["type"]): x["index"]
for x in db_map.mapped_table("list_value").valid_values()
if x["parameter_value_list_id"] == current_list["id"]
}
if (value, type_) in list_value_idx_by_val_typ:
continue
index = max((idx for idx in list_value_idx_by_val_typ.values()), default=-1)
index += 1
index_by_list_name[list_name] = index
yield {"parameter_value_list_name": list_name, "value": value, "type": type_, "index": index}
def _get_metadata_for_import(db_map, data):
for metadata in data:
for name, value in _parse_metadata(metadata):
yield {"name": name, "value": value}
def _get_entity_metadata_for_import(db_map, data):
key = ("entity_class_name", "entity_byname", "metadata_name", "metadata_value")
for class_name, entity_byname, metadata in data:
if isinstance(entity_byname, str):
entity_byname = (entity_byname,)
for name, value in _parse_metadata(metadata):
yield dict(zip(key, (class_name, entity_byname, name, value)))
def _get_parameter_value_metadata_for_import(db_map, data):
key = (
"entity_class_name",
"entity_byname",
"parameter_definition_name",
"metadata_name",
"metadata_value",
"alternative_name",
)
for class_name, entity_byname, parameter_name, metadata, *optionals in data:
if isinstance(entity_byname, str):
entity_byname = (entity_byname,)
alternative_name = optionals[0] if optionals else db_map.get_import_alternative_name()
for name, value in _parse_metadata(metadata):
yield dict(zip(key, (class_name, entity_byname, parameter_name, name, value, alternative_name)))
# Legacy
def _object_classes_to_entity_classes(data):
for x in data:
if isinstance(x, str):
yield x, ()
else:
name, *optionals = x
yield name, (), *optionals