######################################################################################################################
# Copyright (C) 2017-2022 Spine project consortium
# Copyright Spine Database API contributors
# This file is part of Spine Database API.
# Spine Toolbox is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""
Functions for importing data into a Spine database in a standard format.
This functionality is equivalent to the one provided by :meth:`.DatabaseMapping.add_update_item`,
but the syntax is a little more compact.
"""
from collections import defaultdict
from collections.abc import Callable, Iterable, Iterator, Sequence
from contextlib import suppress
from typing import Any, Optional, TypeAlias
from . import DatabaseMapping, SpineDBAPIError
from .helpers import DisplayStatusValue, ItemType
from .parameter_value import (
ConflictResolution,
ConflictResolutionCallable,
Value,
fancy_type_to_type_and_rank,
get_conflict_fixer,
to_database,
)
UnparseCallable: TypeAlias = Callable[[Value], tuple[bytes, Optional[str]]]
Alternative: TypeAlias = str | tuple[str] | tuple[str, str]
Scenario: TypeAlias = str | tuple[str] | tuple[str, bool] | tuple[str, bool, str]
ScenarioAlternative: TypeAlias = tuple[str, str] | tuple[str, str, str]
Location: TypeAlias = tuple[float, float, float, str, str]
Entity: TypeAlias = (
tuple[str, str | Sequence[str]]
| tuple[str, str | Sequence[str], str]
| tuple[str, str | Sequence[str], str, Location]
)
EntityAlternative: TypeAlias = tuple[str, str | Sequence[str], str, bool]
EntityGroup: TypeAlias = tuple[str, str, str]
EntityClass: TypeAlias = (
str
| tuple[str]
| tuple[str, Sequence[str]]
| tuple[str, Sequence[str], str]
| tuple[str, Sequence[str], str, int]
| tuple[str, Sequence[str], str, int, bool]
)
ParameterValueList: TypeAlias = tuple[str, Any]
ParameterValue: TypeAlias = tuple[str, str | Sequence[str], str, Any] | tuple[str, str | Sequence[str], str, Any, str]
ParameterGroup: TypeAlias = tuple[str, str, int]
ParameterType: TypeAlias = tuple[str, str, str] | tuple[str, str, str, str]
ParameterDefinition: TypeAlias = (
tuple[str, str]
| tuple[str, str, Any]
| tuple[str, str, Any, str]
| tuple[str, str, Any, str, str]
| tuple[str, str, Any, str, str, str]
| tuple[str, str, Any, str, str, str, str]
)
SuperclassSubclass: TypeAlias = tuple[str, str]
DisplayMode: TypeAlias = str | tuple[str] | tuple[str, DisplayStatusValue]
EntityClassDisplayMode: TypeAlias = (
tuple[str, str, int]
| tuple[str, str, int, DisplayStatusValue]
| tuple[str, str, int, DisplayStatusValue, str]
| tuple[str, str, int, DisplayStatusValue, str, str]
)
Metadata: TypeAlias = tuple[str, str]
EntityMetadata: TypeAlias = tuple[str, str | Sequence[str], str, str]
ParameterValueMetadata: TypeAlias = (
tuple[str, str | Sequence[str], str, str, str] | tuple[str, str | Sequence[str], str, str, str, str]
)
[docs]
def import_data(
db_map: DatabaseMapping,
unparse_value: UnparseCallable = to_database,
on_conflict: ConflictResolution = "merge",
**kwargs,
) -> tuple[int, list[str]]:
"""Imports data into a Spine database using a standard format.
Example::
entity_classes = [
('example_class', ()), ('other_class', ()), ('multi_d_class', ('example_class', 'other_class'))
]
alternatives = [('example_alternative', 'An example')]
scenarios = [('example_scenario', 'An example')]
scenario_alternatives = [
('example_scenario', 'example_alternative'), ('example_scenario', 'Base', 'example_alternative')
]
parameter_value_lists = [("example_list", "value1"), ("example_list", "value2")]
parameter_definitions = [('example_class', 'example_parameter'), ('multi_d_class', 'other_parameter')]
entities = [
('example_class', 'example_entity'),
('example_class', 'example_group'),
('example_class', 'example_member'),
('other_class', 'other_entity'),
('multi_d_class', ('example_entity', 'other_entity')),
]
entity_groups = [
('example_class', 'example_group', 'example_member'),
('example_class', 'example_group', 'example_entity'),
]
parameter_values = [
('example_object_class', 'example_entity', 'example_parameter', 3.14),
('multi_d_class', ('example_entity', 'other_entity'), 'rel_parameter', 2.718),
]
entity_alternatives = [
('example_class', 'example_entity', "example_alternative", True),
('example_class', 'example_entity', "example_alternative", False),
]
import_data(
db_map,
entity_classes=entity_classes,
alternatives=alternatives,
scenarios=scenarios,
scenario_alternatives=scenario_alternatives,
parameter_value_lists=parameter_value_lists,
parameter_definitions=parameter_definitions,
entities=entities,
entity_groups=entity_groups,
parameter_values=parameter_values,
entity_alternatives=entity_alternatives,
)
Args:
db_map: database mapping
unparse_value: function to call to parse parameter values
on_conflict: Conflict resolution strategy for :func:`parameter_value.fix_conflict`
**kwargs: data to import
Returns:
number of items imported and list of errors
"""
all_errors: list[str] = []
num_imports = 0
conflict_fixer = get_conflict_fixer(on_conflict)
for item_type, items in get_data_for_import(
db_map, all_errors, unparse_value=unparse_value, fix_value_conflict=conflict_fixer, **kwargs
):
added, updated, errors = db_map.add_update_items(item_type, *items, strict=False)
num_imports += len(added + updated)
all_errors.extend(errors)
return num_imports, all_errors
[docs]
def get_data_for_import(
db_map: DatabaseMapping,
all_errors: list[str],
unparse_value: UnparseCallable = to_database,
fix_value_conflict: ConflictResolutionCallable = get_conflict_fixer("merge"),
entity_classes: Iterable[EntityClass] = (),
entities: Iterable[Entity] = (),
entity_groups: Iterable[EntityGroup] = (),
entity_alternatives: Iterable[EntityAlternative] = (),
parameter_definitions: Iterable[ParameterDefinition] = (),
parameter_types: Iterable[ParameterType] = (),
parameter_values: Iterable[ParameterValue] = (),
parameter_value_lists: Iterable[ParameterValueList] = (),
parameter_groups: Iterable[ParameterGroup] = (),
alternatives: Iterable[Alternative] = (),
scenarios: Iterable[Scenario] = (),
scenario_alternatives: Iterable[ScenarioAlternative] = (),
metadata: Iterable[Metadata] = (),
entity_metadata: Iterable[EntityMetadata] = (),
parameter_value_metadata: Iterable[ParameterValueMetadata] = (),
superclass_subclasses: Iterable[SuperclassSubclass] = (),
display_modes: Iterable[DisplayMode] = (),
entity_class_display_modes: Iterable[EntityClassDisplayMode] = (),
# legacy
object_classes=(),
relationship_classes=(),
object_parameters=(),
relationship_parameters=(),
objects=(),
relationships=(),
object_groups=(),
object_parameter_values=(),
relationship_parameter_values=(),
object_metadata=(),
relationship_metadata=(),
object_parameter_value_metadata=(),
relationship_parameter_value_metadata=(),
# removed
tools=(),
features=(),
tool_features=(),
tool_feature_methods=(),
) -> Iterator[tuple[ItemType, Iterable[dict]]]:
"""Yields data to import into a Spine DB.
Args:
db_map: database mapping
all_errors: errors encountered during import
unparse_value: function to call when parsing parameter values
fix_value_conflict: parameter value conflict resolution function
entity_classes: entity class tuples
parameter_definitions: tuples of parameter definitions
parameter_types: tuples of parameter types
parameter_groups: tuples of parameter groups
entities: tuples of entities
entity_alternatives: tuples of entity alternatives
entity_groups: tuples of entity groups
parameter_values: tuples of parameter values
alternatives: tuples of alternatives
scenarios: tuples of scenarios
scenario_alternatives: tuples of scenario alternatives
parameter_value_lists: tuples of parameter value lists
metadata: tuples of metadata
entity_metadata: tuples of entity metadata
parameter_value_metadata: tuples of parameter value metadata
superclass_subclasses: tuples of superclass subclasses
display_modes: tuples of display modes
entity_class_display_modes: tuples of entity class display modes
Yields:
tuple of (item type, item dicts)
"""
# NOTE: The order is important, because of references. E.g., we want to import alternatives before parameter_values
if alternatives:
yield ("alternative", _get_alternatives_for_import(alternatives))
if scenarios:
yield ("scenario", _get_scenarios_for_import(scenarios))
if scenario_alternatives:
yield ("scenario_alternative", _get_scenario_alternatives_for_import(db_map, scenario_alternatives, all_errors))
if entity_classes:
for bucket in _get_entity_classes_for_import(entity_classes):
yield ("entity_class", bucket)
if object_classes: # Legacy
yield from get_data_for_import(
db_map, all_errors, entity_classes=_object_classes_to_entity_classes(object_classes)
)
if relationship_classes: # Legacy
yield from get_data_for_import(db_map, all_errors, entity_classes=relationship_classes)
if superclass_subclasses:
yield ("superclass_subclass", _get_superclass_subclasses_for_import(superclass_subclasses))
if entities:
for bucket in _get_entities_for_import(entities):
yield ("entity", bucket)
if objects: # Legacy
yield from get_data_for_import(db_map, all_errors, entities=objects)
if relationships: # Legacy
yield from get_data_for_import(db_map, all_errors, entities=relationships)
if entity_alternatives:
yield ("entity_alternative", _get_entity_alternatives_for_import(entity_alternatives))
if entity_groups:
yield ("entity_group", _get_entity_groups_for_import(entity_groups))
if object_groups: # Legacy
yield from get_data_for_import(db_map, all_errors, entity_groups=object_groups)
if parameter_value_lists:
yield ("parameter_value_list", _get_parameter_value_lists_for_import(parameter_value_lists))
yield ("list_value", _get_list_values_for_import(db_map, parameter_value_lists, unparse_value))
if parameter_groups:
yield ("parameter_group", _get_parameter_groups_for_import(parameter_groups))
if parameter_definitions:
yield (
"parameter_definition",
_get_parameter_definitions_for_import(parameter_definitions, unparse_value),
)
if object_parameters: # Legacy
yield from get_data_for_import(
db_map, all_errors, unparse_value=unparse_value, parameter_definitions=object_parameters
)
if relationship_parameters: # Legacy
yield from get_data_for_import(
db_map, all_errors, unparse_value=unparse_value, parameter_definitions=relationship_parameters
)
if parameter_types:
yield ("parameter_type", _get_parameter_types_for_import(parameter_types, all_errors))
if parameter_values:
yield (
"parameter_value",
_get_parameter_values_for_import(db_map, parameter_values, all_errors, unparse_value, fix_value_conflict),
)
if object_parameter_values: # Legacy
yield from get_data_for_import(
db_map,
all_errors,
unparse_value=unparse_value,
fix_value_conflict=fix_value_conflict,
parameter_values=object_parameter_values,
)
if relationship_parameter_values: # Legacy
yield from get_data_for_import(
db_map,
all_errors,
unparse_value=unparse_value,
fix_value_conflict=fix_value_conflict,
parameter_values=relationship_parameter_values,
)
if metadata:
yield ("metadata", _get_metadata_for_import(metadata))
if entity_metadata:
yield ("metadata", _get_metadata_for_import(((item[2], item[3]) for item in entity_metadata)))
yield ("entity_metadata", _get_entity_metadata_for_import(entity_metadata))
if parameter_value_metadata:
yield (
"metadata",
_get_metadata_for_import(((item[3], item[4]) for item in parameter_value_metadata)),
)
yield ("parameter_value_metadata", _get_parameter_value_metadata_for_import(db_map, parameter_value_metadata))
if object_metadata: # Legacy
yield from get_data_for_import(db_map, all_errors, entity_metadata=object_metadata)
if relationship_metadata: # Legacy
yield from get_data_for_import(db_map, all_errors, entity_metadata=relationship_metadata)
if object_parameter_value_metadata: # Legacy
yield from get_data_for_import(db_map, all_errors, parameter_value_metadata=object_parameter_value_metadata)
if relationship_parameter_value_metadata: # Legacy
yield from get_data_for_import(
db_map, all_errors, parameter_value_metadata=relationship_parameter_value_metadata
)
if display_modes:
yield (
"display_mode",
_get_display_modes_for_import(display_modes),
)
if entity_class_display_modes:
yield (
"entity_class_display_mode",
_get_entity_class_display_modes_for_import(entity_class_display_modes),
)
[docs]
def import_superclass_subclasses(db_map: DatabaseMapping, data: Iterable[SuperclassSubclass]) -> tuple[int, list[str]]:
"""Imports superclass_subclasses into a Spine database using a standard format.
Args:
db_map: database mapping
data: tuples of (superclass name, subclass name)
Returns:
tuple of (number of items imported, list of errors)
"""
return import_data(db_map, superclass_subclasses=data)
[docs]
def import_entity_classes(db_map: DatabaseMapping, data: Iterable[EntityClass]) -> tuple[int, list[str]]:
"""Imports entity classes into a Spine database using a standard format.
Args:
db_map: database mapping
data: tuples of (name, [(dimension 1 name, dimension 2 name ,...)], [description], [display icon integer], [active by default])
Returns:
tuple of (number of items imported, list of errors)
"""
return import_data(db_map, entity_classes=data)
[docs]
def import_entities(db_map: DatabaseMapping, data: Iterable[Entity]) -> tuple[int, list[str]]:
"""Imports entities into a Spine database using a standard format.
Args:
db_map: database mapping
data: tuples of (class name, entity name or byname, [description], [location])
where location is a tuple of (latitude, longitude, altitude, shape name, shape GEOJSON)
Returns:
tuple of (number of items imported, list of errors)
"""
return import_data(db_map, entities=data)
[docs]
def import_entity_alternatives(db_map: DatabaseMapping, data: Iterable[EntityAlternative]) -> tuple[int, list[str]]:
"""Imports entity alternatives into a Spine database using a standard format.
Args:
db_map: database mapping
data: tuples of (class name, entity name or byname, alternative name, activity)
Returns:
tuple of (number of items imported, list of errors)
"""
return import_data(db_map, entity_alternatives=data)
[docs]
def import_entity_groups(db_map: DatabaseMapping, data: Iterable[EntityGroup]) -> tuple[int, list[str]]:
"""Imports entity groups into a Spine database using a standard format.
Args:
db_map: database mapping
data: tuples of (class name, group entity name, member entity name)
Returns:
tuple of (number of items imported, list of errors)
"""
return import_data(db_map, entity_groups=data)
[docs]
def import_parameter_definitions(
db_map: DatabaseMapping, data: Iterable[ParameterDefinition], unparse_value: UnparseCallable = to_database
) -> tuple[int, list[str]]:
"""Imports parameter definitions into a Spine database using a standard format.
Args:
db_map: database mapping
data: tuples of (class name, parameter name, [default value], [parameter value list name], [description], [parameter group name])
unparse_value: function to parse parameter values
Returns:
tuple of (number of items imported, list of errors)
"""
return import_data(db_map, parameter_definitions=data, unparse_value=unparse_value)
[docs]
def import_parameter_types(
db_map: DatabaseMapping, data: Iterable[ParameterType], unparse_value: UnparseCallable = to_database
) -> tuple[int, list[str]]:
"""Imports parameter types into a Spine database using a standard format.
Args:
db_map: database mapping
data: tuple of (class name, parameter name, type, [succeeding type])
unparse_value: function to parse parameter values
Returns:
tuple of (number of items imported, list of errors)
"""
return import_data(db_map, parameter_types=data, unparse_value=unparse_value)
[docs]
def import_parameter_values(
db_map: DatabaseMapping,
data: Iterable[ParameterValue],
unparse_value: UnparseCallable = to_database,
on_conflict: ConflictResolution = "merge",
) -> tuple[int, list[str]]:
"""Imports parameter values into a Spine database using a standard format.
Args:
db_map: database mapping
data: tuples of (class name, entity name or byname, parameter definition name, value, [alternative_name])
unparse_value: function to parse parameter values
on_conflict: Conflict resolution strategy; options: "keep", "replace", "merge"
Returns:
tuple of (number of items imported, list of errors)
"""
return import_data(db_map, parameter_values=data, unparse_value=unparse_value, on_conflict=on_conflict)
[docs]
def import_alternatives(db_map: DatabaseMapping, data: Iterable[Alternative]) -> tuple[int, list[str]]:
"""Imports alternatives into a Spine database using a standard format.
Args:
db_map: database mapping
data: tuples of (name, [description])
Returns:
tuple of (number of items imported, list of errors)
"""
return import_data(db_map, alternatives=data)
[docs]
def import_scenarios(db_map: DatabaseMapping, data: Iterable[Scenario]) -> tuple[int, list[str]]:
"""Imports scenarios into a Spine database using a standard format.
Args:
db_map: database mapping
data: tuples of (name, [<unused bool>], [description])
Returns:
tuple of (number of items imported, list of errors)
"""
return import_data(db_map, scenarios=data)
[docs]
def import_display_modes(db_map: DatabaseMapping, data: Iterable[DisplayMode]) -> tuple[int, list[str]]:
"""Imports display modes into a Spine database using a standard format.
Args:
db_map: database mapping
data: tuples of (name, [description])
Returns:
tuple of (number of items imported, list of errors)
"""
return import_data(db_map, display_modes=data)
[docs]
def import_entity_class_display_modes(
db_map: DatabaseMapping, data: Iterable[EntityClassDisplayMode]
) -> tuple[int, list[str]]:
"""Imports entity class display modes into a Spine database using a standard format.
Args:
db_map: database mapping
data: tuples of (display mode name, entity class name, display order)
Returns:
tuple of (number of items imported, list of errors)
"""
return import_data(db_map, entity_class_display_modes=data)
[docs]
def import_scenario_alternatives(db_map: DatabaseMapping, data: Iterable[ScenarioAlternative]) -> tuple[int, list[str]]:
"""Imports scenario alternatives into a Spine database using a standard format.
Args:
db_map: database mapping
data: tuples of (scenario, alternative, [succeeding alternative])
Returns:
tuple of (number of items imported, list of errors)
"""
return import_data(db_map, scenario_alternatives=data)
[docs]
def import_parameter_value_lists(
db_map: DatabaseMapping,
data: Iterable[ParameterValueList],
unparse_value: UnparseCallable = to_database,
) -> tuple[int, list[str]]:
"""Imports parameter value lists into a Spine database using a standard format.
Args:
db_map: database mapping
data: tuples of (list name, value)
unparse_value: function to parse parameter values
Returns:
tuple of (number of items imported, list of errors)
"""
return import_data(db_map, parameter_value_lists=data, unparse_value=unparse_value)
[docs]
def import_parameter_groups(db_map: DatabaseMapping, data: Iterable[ParameterGroup]) -> tuple[int, list[str]]:
"""Imports parameter groups into a Spine database using a standard format.
Args:
db_map: database mapping
data: tuples of (group name, color as 6-digit HEX value)
Returns:
tuple of (number of groups imported, list of errors)
"""
return import_data(db_map, parameter_groups=data)
def import_object_classes(db_map, data):
return import_data(db_map, object_classes=data)
def import_relationship_classes(db_map, data):
return import_data(db_map, relationship_classes=data)
def import_objects(db_map, data):
return import_data(db_map, objects=data)
def import_object_groups(db_map, data):
return import_data(db_map, object_groups=data)
def import_relationships(db_map, data):
return import_data(db_map, relationships=data)
def import_object_parameters(db_map, data, unparse_value=to_database):
return import_data(db_map, object_parameters=data, unparse_value=unparse_value)
def import_relationship_parameters(db_map, data, unparse_value=to_database):
return import_data(db_map, relationship_parameters=data, unparse_value=unparse_value)
def import_object_parameter_values(db_map, data, unparse_value=to_database, on_conflict="merge"):
return import_data(db_map, object_parameter_values=data, unparse_value=unparse_value, on_conflict=on_conflict)
def import_relationship_parameter_values(db_map, data, unparse_value=to_database, on_conflict="merge"):
return import_data(db_map, relationship_parameter_values=data, unparse_value=unparse_value, on_conflict=on_conflict)
def import_object_metadata(db_map, data):
return import_data(db_map, object_metadata=data)
def import_relationship_metadata(db_map, data):
return import_data(db_map, relationship_metadata=data)
def import_object_parameter_value_metadata(db_map, data):
return import_data(db_map, object_parameter_value_metadata=data)
def import_relationship_parameter_value_metadata(db_map, data):
return import_data(db_map, relationship_parameter_value_metadata=data)
def _get_entity_classes_for_import(data: Iterable[EntityClass]) -> Iterable[dict]:
dim_name_list_by_name = {}
items = []
key = ("name", "dimension_name_list", "description", "display_icon", "active_by_default")
for x in data:
if isinstance(x, str):
x = x, ()
name, *optionals = x
dim_name_list = optionals.pop(0) if optionals else ()
item = dict(zip(key, (name, dim_name_list, *optionals)))
items.append(item)
dim_name_list_by_name[name] = dim_name_list
def _ref_count(name):
dim_name_list = dim_name_list_by_name.get(name, ())
return len(dim_name_list) + sum((_ref_count(dim_name) for dim_name in dim_name_list), start=0)
items_by_ref_count = {}
for item in items:
items_by_ref_count.setdefault(_ref_count(item["name"]), []).append(item)
return (items_by_ref_count[ref_count] for ref_count in sorted(items_by_ref_count))
def _get_superclass_subclasses_for_import(data: Iterable[SuperclassSubclass]) -> Iterable[dict]:
key = ("superclass_name", "subclass_name")
return (dict(zip(key, x)) for x in data)
def _get_display_modes_for_import(data: Iterable[DisplayMode]) -> Iterable[dict]:
key = ("name", "description")
return ({"name": x} if isinstance(x, str) else dict(zip(key, x)) for x in data)
def _get_entity_class_display_modes_for_import(data: Iterable[EntityClassDisplayMode]) -> Iterator[dict]:
key = (
"display_mode_name",
"entity_class_name",
"display_order",
"display_status",
"display_font_color",
"display_background_color",
)
for display_mode_name, entity_class_name, display_order, *optionals in data:
yield dict(zip(key, (display_mode_name, entity_class_name, display_order, *optionals)))
def _get_entities_for_import(data: Iterable[Entity]) -> Iterable[list[dict]]:
items_by_el_count: dict[int, list[dict]] = {}
key = ("entity_class_name", "entity_byname", "description", "location")
for class_name, name_or_el_name_list, *optionals in data:
if isinstance(name_or_el_name_list, str):
el_count = 0
byname = (name_or_el_name_list,)
else:
el_count = len(name_or_el_name_list)
byname = name_or_el_name_list
item = dict(zip(key, (class_name, byname, *optionals)))
if "location" in item:
location = item.pop("location")
if location is not None:
with suppress(IndexError):
item["lat"] = location[0]
item["lon"] = location[1]
item["alt"] = location[2]
item["shape_name"] = location[3]
item["shape_blob"] = location[4]
items_by_el_count.setdefault(el_count, []).append(item)
return (items_by_el_count[el_count] for el_count in sorted(items_by_el_count))
def _get_entity_alternatives_for_import(data: Iterable[EntityAlternative]) -> Iterable[dict]:
for class_name, entity_name_or_element_name_list, alternative, active in data:
is_zero_dim = isinstance(entity_name_or_element_name_list, str)
entity_byname = (entity_name_or_element_name_list,) if is_zero_dim else entity_name_or_element_name_list
key = ("entity_class_name", "entity_byname", "alternative_name", "active")
yield dict(zip(key, (class_name, entity_byname, alternative, active)))
def _get_entity_groups_for_import(data: Iterable[EntityGroup]) -> Iterable[dict]:
key = ("entity_class_name", "group_name", "member_name")
return (dict(zip(key, x)) for x in data)
def _get_parameter_definitions_for_import(
data: Iterable[ParameterDefinition], unparse_value: UnparseCallable
) -> Iterator[dict]:
key = (
"entity_class_name",
"name",
"default_value",
"default_type",
"parameter_value_list_name",
"description",
"parameter_group_name",
)
for class_name, parameter_name, *optionals in data:
if not optionals:
yield dict(zip(key, (class_name, parameter_name)))
continue
value = optionals.pop(0)
value, type_ = unparse_value(value)
yield dict(zip(key, (class_name, parameter_name, value, type_, *optionals)))
def _get_parameter_values_for_import(
db_map: DatabaseMapping,
data: Iterable[ParameterValue],
all_errors: list[str],
unparse_value: UnparseCallable,
fix_conflict: ConflictResolutionCallable,
) -> Iterator[dict]:
seen = set()
key = ("entity_class_name", "entity_byname", "parameter_definition_name", "alternative_name", "value", "type")
parameter_value_table = db_map.mapped_table("parameter_value")
for class_name, entity_byname, parameter_name, value, *optionals in data:
if isinstance(entity_byname, str):
entity_byname = (entity_byname,)
else:
entity_byname = tuple(entity_byname)
alternative_name = optionals[0] if optionals else db_map.get_import_alternative_name()
unique_values = (class_name, entity_byname, parameter_name, alternative_name)
if unique_values in seen:
dupe = dict(zip(key, unique_values))
all_errors.append(
f"attempting to import more than one parameter_value with {dupe} - only first will be considered"
)
continue
seen.add(unique_values)
value, type_ = unparse_value(value)
item = dict(zip(key, unique_values + (None, None)))
try:
pv = parameter_value_table.find_item(item)
except SpineDBAPIError:
pass
else:
value, type_ = fix_conflict((value, type_), (pv["value"], pv["type"]))
item.update({"value": value, "type": type_})
yield item
def _get_alternatives_for_import(data: Iterable[Alternative]) -> Iterable[dict]:
key = ("name", "description")
return ({"name": x} if isinstance(x, str) else dict(zip(key, x)) for x in data)
def _get_scenarios_for_import(data: Iterable[Scenario]) -> Iterable[dict]:
key = ("name", "active", "description")
return ({"name": x} if isinstance(x, str) else dict(zip(key, x)) for x in data)
def _get_scenario_alternatives_for_import(
db_map: DatabaseMapping, data: Iterable[ScenarioAlternative], all_errors: list[str]
) -> Iterable[dict]:
# FIXME: maybe when updating, we only want to match by (scen_name, alt_name) and not by (scen_name, rank)
alt_name_list_by_scen_name = {}
succ_by_pred_by_scen_name = defaultdict(dict)
for scen_name, predecessor, *optionals in data:
successor = optionals[0] if optionals else None
succ_by_pred_by_scen_name[scen_name][predecessor] = successor
scenario_table = db_map.mapped_table("scenario")
for scen_name, succ_by_pred in succ_by_pred_by_scen_name.items():
try:
scen = scenario_table.find_item({"name": scen_name})
except SpineDBAPIError:
alternative_name_list = []
else:
alternative_name_list = scen.get("alternative_name_list", [])
alt_name_list_by_scen_name[scen_name] = alternative_name_list
alternative_name_list.append(None) # So alternatives where successor is None find their place at the tail
while succ_by_pred:
some_added = False
for pred, succ in list(succ_by_pred.items()):
if succ in alternative_name_list:
if pred in alternative_name_list:
alternative_name_list.remove(pred)
i = alternative_name_list.index(succ)
alternative_name_list.insert(i, pred)
del succ_by_pred[pred]
some_added = True
if not some_added:
break
alternative_name_list.pop(-1) # Remove the None
all_errors += [
f"can't insert alternative '{pred}' before '{succ}' because the latter is not in scenario '{scen}'"
for scen, succ_by_pred in succ_by_pred_by_scen_name.items()
for pred, succ in succ_by_pred.items()
]
for scen_name, alternative_name_list in alt_name_list_by_scen_name.items():
for k, alt_name in enumerate(alternative_name_list):
yield {"scenario_name": scen_name, "alternative_name": alt_name, "rank": k + 1}
def _get_parameter_value_lists_for_import(data: Iterable[ParameterValueList]) -> Iterable[dict]:
return ({"name": x} for x in {x[0]: None for x in data})
def _get_list_values_for_import(
db_map: DatabaseMapping, data: Iterable[ParameterValueList], unparse_value: UnparseCallable
) -> Iterator[dict]:
index_by_list_name = {}
db_map.fetch_all("list_value")
value_list_table = db_map.mapped_table("parameter_value_list")
for list_name, value in data:
value, type_ = unparse_value(value)
index = index_by_list_name.get(list_name)
if index is None:
current_list = value_list_table.find_item({"name": list_name})
current_list_id = current_list["id"]
list_value_idx_by_val_typ = {
(x["value"], x["type"]): x["index"]
for x in db_map.mapped_table("list_value").valid_values()
if x["parameter_value_list_id"] == current_list_id
}
if (value, type_) in list_value_idx_by_val_typ:
continue
index = max((idx for idx in list_value_idx_by_val_typ.values()), default=-1)
index += 1
index_by_list_name[list_name] = index
yield {"parameter_value_list_name": list_name, "value": value, "type": type_, "index": index}
def _get_parameter_types_for_import(data: Iterable[ParameterType], all_errors: list[str]) -> Iterator[dict]:
for class_name, definition_name, parameter_type, *optionals in data:
if not optionals:
if parameter_type == "map":
all_errors.append(f"Missing rank for map type for parameter {definition_name} in class {class_name}")
continue
try:
parameter_type, rank = fancy_type_to_type_and_rank(parameter_type)
except ValueError:
all_errors.append(
f"Failed to read rank from type '{parameter_type}' for parameter {definition_name} in class {class_name}"
)
continue
else:
rank = optionals[0]
yield {
"entity_class_name": class_name,
"parameter_definition_name": definition_name,
"type": parameter_type,
"rank": rank,
}
def _get_parameter_groups_for_import(data: Iterable[ParameterGroup]) -> Iterator[dict]:
for parameter_group in data:
yield {"name": parameter_group[0], "color": parameter_group[1], "priority": parameter_group[2]}
def _get_metadata_for_import(data: Iterable[Metadata]) -> Iterator[dict]:
for metadata in data:
yield {"name": metadata[0], "value": metadata[1]}
def _get_entity_metadata_for_import(data: Iterable[EntityMetadata]) -> Iterator[dict]:
key = ("entity_class_name", "entity_byname", "metadata_name", "metadata_value")
for class_name, entity_byname, metadata_name, metadata_value in data:
if isinstance(entity_byname, str):
entity_byname = (entity_byname,)
yield dict(zip(key, (class_name, entity_byname, metadata_name, metadata_value)))
def _get_parameter_value_metadata_for_import(
db_map: DatabaseMapping, data: Iterable[ParameterValueMetadata]
) -> Iterator[dict]:
key = (
"entity_class_name",
"entity_byname",
"parameter_definition_name",
"metadata_name",
"metadata_value",
"alternative_name",
)
for class_name, entity_byname, parameter_name, metadata_name, metadata_value, *optionals in data:
if isinstance(entity_byname, str):
entity_byname = (entity_byname,)
alternative_name = optionals[0] if optionals else db_map.get_import_alternative_name()
yield dict(
zip(key, (class_name, entity_byname, parameter_name, metadata_name, metadata_value, alternative_name))
)
# Legacy
def _object_classes_to_entity_classes(data):
for x in data:
if isinstance(x, str):
yield x, ()
else:
name, *optionals = x
yield name, (), *optionals