######################################################################################################################
# Copyright (C) 2017-2022 Spine project consortium
# Copyright Spine Database API contributors
# This file is part of Spine Database API.
# Spine Database API is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser
# General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""
Parameter values in a Spine DB can be of different types (see :ref:`parameter_value_format`).
For each of these types, this module provides a Python class to represent values of that type.
.. list-table:: Parameter value type and Python class
:header-rows: 1
* - type
- Python class
* - ``date_time``
- :class:`DateTime`
* - ``duration``
- :class:`Duration`
* - ``array``
- :class:`Array`
* - ``time_pattern``
- :class:`TimePattern`
* - ``time_series``
- :class:`TimeSeriesFixedResolution` and :class:`TimeSeriesVariableResolution`
* - ``map``
- :class:`Map`
The module also provides the functions :func:`to_database` and :func:`from_database`
to translate between instances of the above classes and their DB representation (namely, the `value` and `type` fields
that would go in the ``parameter_value`` table).
For example, to write a Python object into a parameter value in the DB::
# Create the Python object
parsed_value = TimeSeriesFixedResolution(
"2023-01-01T00:00", # start
"1D", # resolution
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0], # values
ignore_year=False,
repeat=False,
)
# Translate it to value and type
value, type_ = to_database(parsed_value)
# Add a parameter_value to the DB with that value and type
with DatabaseMapping(url) as db_map:
db_map.add_parameter_value_item(
entity_class_name="cat",
entity_byname=("Tom",),
parameter_definition_name="number_of_lives",
alternative_name="Base",
value=value,
type=type_,
)
db_map.commit_session("Tom is living one day at a time")
The value can be accessed as a Python object using the ``parsed_value`` field::
# Get the parameter_value from the DB
with DatabaseMapping(url) as db_map:
pval_item = db_map.get_parameter_value_item(
entity_class_name="cat",
entity_byname=("Tom",),
parameter_definition_name="number_of_lives",
alternative_name="Base",
)
value = pval_item["parsed_value"]
In the rare case where a manual conversion from a DB value to Python object is needed,
use :func:`.from_database`::
# Obtain value and type
value, type_ = pval_item["value"], pval_item["type"]
# Translate value and type to a Python object manually
parsed_value = from_database(value, type_)
"""
from __future__ import annotations
from collections.abc import Callable, Iterable, Sequence
from copy import copy
from datetime import datetime
from itertools import takewhile
import json
from json.decoder import JSONDecodeError
import re
from typing import Any, Literal, Optional, SupportsFloat, Type, TypeAlias, Union
import dateutil.parser
from dateutil.relativedelta import relativedelta
import numpy as np
import numpy.typing as nptyping
from .exception import ParameterValueFormatError
# Defaulting to seconds precision in numpy.
NUMPY_DATETIME_DTYPE = "datetime64[s]"
NUMPY_DATETIME64_UNIT = "s"
# Default start time guess, actual value not currently given in the JSON specification.
TIME_SERIES_DEFAULT_START = "0001-01-01T00:00:00"
# Default resolution if it is omitted from the index entry.
TIME_SERIES_DEFAULT_RESOLUTION = "1h"
# Default unit if resolution is given as a number instead of a string.
_TIME_SERIES_PLAIN_INDEX_UNIT = "m"
FLOAT_VALUE_TYPE = "float"
BOOLEAN_VALUE_TYPE = "bool"
STRING_VALUE_TYPE = "str"
ConflictResolution: TypeAlias = Literal["keep", "replace", "merge"]
ConflictResolutionCallable: TypeAlias = Callable[
[tuple[bytes, Optional[str]], tuple[bytes, Optional[str]]], tuple[bytes, Optional[str]]
]
[docs]
def from_database(value: bytes, type_: Optional[str]) -> Optional[Value]:
"""
Converts a parameter value from the DB into a Python object.
Args:
value: The binary blob containing the value data from database.
type_: Value's type.
Returns:
A Python object representing the value.
"""
parsed = load_db_value(value, type_)
if isinstance(parsed, dict):
return from_dict(parsed)
if isinstance(parsed, bool):
return parsed
if isinstance(parsed, SupportsFloat):
return float(parsed)
return parsed
[docs]
def to_database(parsed_value: Optional[Value]) -> tuple[bytes, Optional[str]]:
"""
Converts a Python object representing a parameter value into its DB representation.
Args:
parsed_value: A Python object representing the value.
Returns:
The value as a binary blob and its type string.
"""
if hasattr(parsed_value, "to_database"):
return parsed_value.to_database()
db_value = json.dumps(parsed_value).encode("UTF8")
db_type = type_for_scalar(parsed_value)
return db_value, db_type
def duration_to_relativedelta(duration: str) -> relativedelta:
"""
Converts a duration to a ``relativedelta`` object.
:meta private:
Args:
duration: a duration string.
Returns:
A relativedelta object corresponding to the given duration.
"""
try:
count, abbreviation, full_unit = re.split("\\s|([a-z]|[A-Z])", duration, maxsplit=1)
count = int(count)
except ValueError as error:
raise ParameterValueFormatError(f'Could not parse duration "{duration}"') from error
unit = abbreviation if abbreviation is not None else full_unit
if unit in ["s", "second", "seconds"]:
return relativedelta(seconds=count)
if unit in ["m", "minute", "minutes"]:
return relativedelta(minutes=count)
if unit in ["h", "hour", "hours"]:
return relativedelta(hours=count)
if unit in ["D", "day", "days"]:
return relativedelta(days=count)
if unit in ["M", "month", "months"]:
return relativedelta(months=count)
if unit in ["Y", "year", "years"]:
return relativedelta(years=count)
raise ParameterValueFormatError(f'Could not parse duration "{duration}"')
def relativedelta_to_duration(delta: relativedelta) -> str:
"""
Converts a ``relativedelta`` to duration string.
:meta private:
Args:
delta: The relativedelta to convert.
Returns:
A duration string.
"""
if delta.seconds > 0:
seconds = delta.seconds
seconds += 60 * delta.minutes
seconds += 60 * 60 * delta.hours
seconds += 60 * 60 * 24 * delta.days
# Skipping months and years since dateutil does not use them here
# and they wouldn't make much sense anyway.
return f"{seconds}s"
if delta.minutes > 0:
minutes = delta.minutes
minutes += 60 * delta.hours
minutes += 60 * 24 * delta.days
return f"{minutes}m"
if delta.hours > 0:
hours = delta.hours
hours += 24 * delta.days
return f"{hours}h"
if delta.days > 0:
return f"{delta.days}D"
if delta.months > 0:
months = delta.months
months += 12 * delta.years
return f"{months}M"
if delta.years > 0:
return f"{delta.years}Y"
return "0h"
JSONValue = Union[bool, float, str, dict]
def load_db_value(db_value: bytes, type_: Optional[str]) -> Optional[JSONValue]:
"""
Parses a binary blob into a JSON object.
If the result is a dict, adds the "type" property to it.
:meta private:
Args:
db_value: The binary blob.
type_: The value type.
Returns:
The parsed parameter value.
"""
if db_value is None:
return None
try:
parsed = json.loads(db_value)
except JSONDecodeError as err:
raise ParameterValueFormatError(f"Could not decode the value: {err}") from err
if isinstance(parsed, dict):
parsed["type"] = type_
return parsed
def dump_db_value(parsed_value: JSONValue) -> tuple[bytes, str]:
"""
Unparses a JSON object into a binary blob and type string.
If the given object is a dict, extracts the "type" property from it.
:meta private:
Args:
parsed_value: A JSON object, typically obtained by calling :func:`load_db_value`.
Returns:
database representation (value and type).
"""
value_type = parsed_value["type"] if isinstance(parsed_value, dict) else type_for_scalar(parsed_value)
db_value = json.dumps(parsed_value).encode("UTF8")
return db_value, value_type
def from_database_to_single_value(database_value: bytes, value_type: Optional[str]) -> Union[str, Optional[Value]]:
"""
Same as :func:`from_database`, but in the case of indexed types returns just the type as a string.
:meta private:
Args:
database_value: the database value
value_type: the value type
Returns:
the encoded parameter value or its type.
"""
if value_type is None or value_type not in NON_ZERO_RANK_TYPES:
return from_database(database_value, value_type)
return value_type
def from_database_to_dimension_count(database_value: bytes, value_type: Optional[str]) -> int:
"""
Counts the dimensions in a database representation of a parameter value (value and type).
:meta private:
Args:
database_value: the database value
value_type: the value type
Returns:
number of dimensions
"""
if value_type in RANK_1_TYPES:
return 1
if value_type == Map.TYPE:
parsed = load_db_value(database_value, value_type)
if "rank" in parsed:
return parsed["rank"]
map_value = from_dict(parsed)
return map_dimensions(map_value)
return 0
def from_dict(value: dict) -> Optional[Value]:
"""
Converts a dictionary representation of a parameter value into an encoded parameter value.
:meta private:
Args:
value: the value dictionary including the "type" key.
Returns:
the encoded parameter value.
"""
value_type = value["type"]
try:
if value_type == DateTime.TYPE:
return _datetime_from_database(value["data"])
if value_type == Duration.TYPE:
return _duration_from_database(value["data"])
if value_type == Map.TYPE:
return _map_from_database(value)
if value_type == TimePattern.TYPE:
return _time_pattern_from_database(value)
if value_type == TimeSeries.TYPE:
return _time_series_from_database(value)
if value_type == Array.TYPE:
return _array_from_database(value)
raise ParameterValueFormatError(f'Unknown or non-dictionary parameter value type "{value_type}"')
except KeyError as error:
raise ParameterValueFormatError(f'"{error.args[0]}" is missing in the parameter value description') from error
def merge(value: tuple[bytes, Optional[str]], other: tuple[bytes, Optional[str]]) -> tuple[bytes, Optional[str]]:
"""Merges the DB representation of two parameter values.
:meta private:
Args:
value: recipient value and type.
other: other value and type.
Returns:
the DB representation of the merged value.
"""
parsed_value = from_database(*value)
if not hasattr(parsed_value, "merge"):
return value
parsed_other = from_database(*other)
return to_database(parsed_value.merge(parsed_other))
def merge_parsed(parsed_value: Optional[Value], parsed_other: Optional[Value]) -> Optional[Value]:
if not hasattr(parsed_value, "merge"):
return parsed_value
return parsed_value.merge(parsed_other)
_MERGE_FUNCTIONS: dict[ConflictResolution:ConflictResolutionCallable] = {
"keep": lambda new, old: old,
"replace": lambda new, old: new,
"merge": merge,
}
def get_conflict_fixer(
on_conflict: ConflictResolution,
) -> ConflictResolutionCallable:
"""
:meta private:
Returns parameter value conflict resolution function.
Args:
on_conflict: resolution action name
Returns:
conflict resolution function
"""
try:
return _MERGE_FUNCTIONS[on_conflict]
except KeyError:
raise RuntimeError(
f"Invalid conflict resolution strategy {on_conflict}, valid strategies are {', '.join(_MERGE_FUNCTIONS)}"
)
def _break_dictionary(data: dict) -> tuple[list, nptyping.NDArray]:
"""Converts {"index": value} style dictionary into (list(indexes), numpy.ndarray(values)) tuple."""
if not isinstance(data, dict):
raise ParameterValueFormatError(
f"expected data to be in dictionary format, instead got '{type(data).__name__}'"
)
indexes, values = zip(*data.items())
return list(indexes), np.array(values)
def _datetime_from_database(value: str) -> DateTime:
"""Converts a datetime database value into a DateTime object."""
try:
stamp = datetime.fromisoformat(value)
except ValueError:
try:
stamp = dateutil.parser.parse(value)
except ValueError as error:
raise ParameterValueFormatError(f'Could not parse datetime from "{value}"') from error
return DateTime(stamp)
def _duration_from_database(value: Union[int, str]) -> Union[Array, Duration]:
"""
Converts a duration database value into a Duration object.
The deprecated 'variable durations' will be converted to Arrays.
"""
if isinstance(value, (str, int)):
# Set default unit to minutes if value is a plain number.
if not isinstance(value, str):
value = f"{value}m"
elif isinstance(value, Sequence):
# This type of 'variable duration' is deprecated. We make an Array instead.
# Set default unit to minutes for plain numbers in value.
value = [v if isinstance(v, str) else f"{v}m" for v in value]
return Array([Duration(v) for v in value])
else:
raise ParameterValueFormatError("Duration value is of unsupported type")
return Duration(value)
def _time_series_from_database(value_dict: dict) -> TimeSeries:
"""Converts a time series database value into a time series object.
Args:
value_dict: time series dictionary
Returns:
restored time series
"""
data = value_dict["data"]
if isinstance(data, dict):
return _time_series_from_dictionary(value_dict)
if isinstance(data, list):
if isinstance(data[0], Sequence):
return _time_series_from_two_columns(value_dict)
return _time_series_from_single_column(value_dict)
raise ParameterValueFormatError("Unrecognized time series format")
def _variable_resolution_time_series_info_from_index(value: dict) -> tuple[bool, bool]:
"""Returns ignore_year and repeat from index if present or their default values."""
if "index" in value:
data_index = value["index"]
try:
ignore_year = bool(data_index.get("ignore_year", False))
except ValueError as error:
raise ParameterValueFormatError(
f'Could not decode ignore_year from "{data_index["ignore_year"]}"'
) from error
try:
repeat = bool(data_index.get("repeat", False))
except ValueError as error:
raise ParameterValueFormatError(f'Could not decode repeat from "{data_index["repeat"]}"') from error
else:
ignore_year = False
repeat = False
return ignore_year, repeat
def _time_series_from_dictionary(value_dict: dict) -> TimeSeriesVariableResolution:
"""Converts a dictionary style time series into a TimeSeriesVariableResolution object.
Args:
value_dict: time series dictionary
Returns:
restored time series
"""
data = value_dict["data"]
stamps = []
values = np.empty(len(data))
for index, (stamp, series_value) in enumerate(data.items()):
try:
stamp = np.datetime64(stamp, NUMPY_DATETIME64_UNIT)
except ValueError as error:
raise ParameterValueFormatError(f'Could not decode time stamp "{stamp}"') from error
stamps.append(stamp)
values[index] = series_value
stamps = np.array(stamps)
ignore_year, repeat = _variable_resolution_time_series_info_from_index(value_dict)
return TimeSeriesVariableResolution(stamps, values, ignore_year, repeat, value_dict.get("index_name", ""))
def _time_series_from_single_column(value_dict: dict) -> TimeSeriesFixedResolution:
"""Converts a time series dictionary into a TimeSeriesFixedResolution object.
Args:
value_dict: time series dictionary
Returns:
restored time series
"""
if "index" in value_dict:
value_index = value_dict["index"]
start = value_index["start"] if "start" in value_index else TIME_SERIES_DEFAULT_START
resolution = value_index["resolution"] if "resolution" in value_index else TIME_SERIES_DEFAULT_RESOLUTION
if "ignore_year" in value_index:
try:
ignore_year = bool(value_index["ignore_year"])
except ValueError as error:
raise ParameterValueFormatError(
f'Could not decode ignore_year value "{value_index["ignore_year"]}"'
) from error
else:
ignore_year = "start" not in value_index
if "repeat" in value_index:
try:
repeat = bool(value_index["repeat"])
except ValueError as error:
raise ParameterValueFormatError(
f'Could not decode repeat value "{value_index["ignore_year"]}"'
) from error
else:
repeat = "start" not in value_index
else:
start = TIME_SERIES_DEFAULT_START
resolution = TIME_SERIES_DEFAULT_RESOLUTION
ignore_year = True
repeat = True
if isinstance(resolution, str) or not isinstance(resolution, Sequence):
# Always work with lists to simplify the code.
resolution = [resolution]
relativedeltas = []
for duration in resolution:
if not isinstance(duration, str):
duration = str(duration) + _TIME_SERIES_PLAIN_INDEX_UNIT
relativedeltas.append(duration_to_relativedelta(duration))
try:
start = datetime.fromisoformat(start)
except ValueError:
try:
start = dateutil.parser.parse(start)
except ValueError as error:
raise ParameterValueFormatError(f'Could not decode start value "{start}"') from error
values = np.array(value_dict["data"])
return TimeSeriesFixedResolution(
start, relativedeltas, values, ignore_year, repeat, value_dict.get("index_name", "")
)
def _time_series_from_two_columns(value_dict: dict) -> TimeSeriesVariableResolution:
"""Converts a two column style time series into a TimeSeriesVariableResolution object.
Args:
value_dict: time series dictionary
Returns:
restored time series
"""
data = value_dict["data"]
stamps = []
values = np.empty(len(data))
for index, element in enumerate(data):
if not isinstance(element, Sequence) or len(element) != 2:
raise ParameterValueFormatError("Invalid value in time series array")
try:
stamp = np.datetime64(element[0], NUMPY_DATETIME64_UNIT)
except ValueError as error:
raise ParameterValueFormatError(f'Could not decode time stamp "{element[0]}"') from error
stamps.append(stamp)
values[index] = element[1]
stamps = np.array(stamps)
ignore_year, repeat = _variable_resolution_time_series_info_from_index(value_dict)
return TimeSeriesVariableResolution(stamps, values, ignore_year, repeat, value_dict.get("index_name", ""))
def _time_pattern_from_database(value_dict: dict) -> TimePattern:
"""Converts a time pattern database value into a TimePattern object.
Args:
value_dict: time pattern dictionary
Returns:
restored time pattern
"""
patterns, values = _break_dictionary(value_dict["data"])
return TimePattern(patterns, values, value_dict.get("index_name", TimePattern.DEFAULT_INDEX_NAME))
def _map_from_database(value_dict: dict) -> Map:
"""Converts a map from its database representation to a Map object.
Args:
value_dict: Map dictionary
Returns:
restored Map
"""
index_type = _map_index_type_from_database(value_dict["index_type"])
index_name = value_dict.get("index_name", Map.DEFAULT_INDEX_NAME)
data = value_dict["data"]
if isinstance(data, dict):
indexes = _map_indexes_from_database(data.keys(), index_type)
values = _map_values_from_database(data.values())
elif isinstance(data, Sequence):
if not data:
indexes = []
values = []
else:
indexes_in_db = []
values_in_db = []
for row in data:
if not isinstance(row, Sequence) or len(row) != 2:
raise ParameterValueFormatError('"data" is not a nested two column array.')
indexes_in_db.append(row[0])
values_in_db.append(row[1])
indexes = _map_indexes_from_database(indexes_in_db, index_type)
values = _map_values_from_database(values_in_db)
else:
raise ParameterValueFormatError('"data" attribute is not a dict or array.')
return Map(indexes, values, index_type, index_name)
def _map_index_type_from_database(index_type_in_db: str) -> MapIndexType:
"""Returns the type corresponding to index_type string."""
try:
return _MAP_INDEX_TYPES[index_type_in_db]
except KeyError:
raise ParameterValueFormatError(f'Unknown index_type "{index_type_in_db}".')
def _map_index_type_to_database(index_type: MapIndexType) -> str:
"""Returns the string corresponding to given index type."""
if issubclass(index_type, str):
return STRING_VALUE_TYPE
if issubclass(index_type, float):
return FLOAT_VALUE_TYPE
if index_type == DateTime:
return DateTime.TYPE
if index_type == Duration:
return Duration.TYPE
raise ParameterValueFormatError(f'Unknown index type "{index_type.__name__}".')
def _map_indexes_from_database(indexes_in_db: Iterable[Union[float, str]], index_type: MapIndexType) -> list[MapIndex]:
"""Converts map's indexes from their database format."""
try:
indexes = [index_type(index) for index in indexes_in_db]
except ValueError as error:
raise ParameterValueFormatError(
f'Failed to read index of type "{_map_index_type_to_database(index_type)}": {error}'
) from error
return indexes
def _map_index_to_database(index: MapIndex) -> Union[float, str]:
"""Converts a single map index to database format."""
if hasattr(index, "value_to_database_data"):
return index.value_to_database_data()
return index
def _map_value_to_database(value: Value) -> tuple[Optional[Union[bool, float, str, dict]], int]:
"""Converts a single map value to database format."""
if hasattr(value, "to_dict"):
value_type = value.TYPE
value_dict = value.to_dict()
if value_type == "map":
rank = value_dict["rank"]
elif value_type in RANK_1_TYPES:
rank = 1
else:
rank = 0
return {"type": value.TYPE, **value.to_dict()}, rank
return value, 0
def _map_values_from_database(values_in_db: Iterable[Optional[Union[bool, float, str, dict]]]) -> list[Value]:
"""Converts map's values from their database format."""
if not values_in_db:
return []
values = []
for value_in_db in values_in_db:
value = from_dict(value_in_db) if isinstance(value_in_db, dict) else value_in_db
if isinstance(value, int):
value = float(value)
elif value is not None and not isinstance(value, (float, bool, Duration, IndexedValue, str, DateTime)):
raise ParameterValueFormatError(f'Unsupported value type for Map: "{type(value).__name__}".')
values.append(value)
return values
def _array_from_database(value_dict: dict) -> Array:
"""Converts a value dictionary to Array.
Args:
value_dict: array dictionary
Returns:
Array value
"""
value_type_id = value_dict.get("value_type", FLOAT_VALUE_TYPE)
try:
value_type = {
FLOAT_VALUE_TYPE: float,
STRING_VALUE_TYPE: str,
DateTime.TYPE: DateTime,
Duration.TYPE: Duration,
}[value_type_id]
except KeyError:
raise ParameterValueFormatError(f'Unsupported value type for Array: "{value_type_id}".')
try:
data = [value_type(x) for x in value_dict["data"]]
except (TypeError, ParameterValueFormatError) as error:
raise ParameterValueFormatError(f"Failed to read values for Array: {error}") from error
index_name = value_dict.get("index_name", Array.DEFAULT_INDEX_NAME)
return Array(data, value_type, index_name)
[docs]
class ParameterValue:
"""Base for all classes representing parameter values."""
VALUE_TYPE: str = NotImplemented
TYPE: str = NotImplemented
def to_dict(self) -> dict:
"""Returns a dictionary representation of this parameter value.
:meta private:
Returns:
A dictionary including the "type" key.
"""
raise NotImplementedError()
@classmethod
[docs]
def type_(cls) -> str:
"""Returns the type of the parameter value represented by this object."""
return cls.TYPE
[docs]
def to_database(self) -> tuple[bytes, str]:
"""Returns the DB representation of this object. Equivalent to calling :func:`to_database` with it.
Returns:
The `value` and `type` fields that would go in the ``parameter_value`` table.
"""
return json.dumps(self.to_dict()).encode("UTF8"), self.TYPE
[docs]
class DateTime(ParameterValue):
"""A parameter value of type 'date_time'. A point in time."""
VALUE_TYPE = "single value"
TYPE = "date_time"
def __init__(self, value: Optional[Union[str, DateTime, datetime]] = None):
"""
Args:
The `date_time` value.
"""
if value is None:
value = datetime(year=2000, month=1, day=1)
elif isinstance(value, str):
try:
value = datetime.fromisoformat(value)
except ValueError:
try:
value = dateutil.parser.parse(value)
except ValueError as error:
raise ParameterValueFormatError(f'Could not parse datetime from "{value}"') from error
elif isinstance(value, DateTime):
value = copy(value._value)
elif not isinstance(value, datetime):
raise ParameterValueFormatError(f'"{type(value).__name__}" cannot be converted to DateTime.')
self._value = value
def __eq__(self, other):
if not isinstance(other, DateTime):
return NotImplemented
return self._value == other._value
def __lt__(self, other):
if not isinstance(other, DateTime):
return NotImplemented
return self._value < other._value
def __hash__(self):
return hash(self._value)
def __str__(self):
return self._value.isoformat()
def value_to_database_data(self) -> str:
"""Returns the database representation of the datetime.
:meta private:
"""
return self._value.isoformat()
def to_dict(self) -> dict:
return {"data": self.value_to_database_data()}
@property
def value(self) -> datetime:
return self._value
[docs]
class Duration(ParameterValue):
"""
A parameter value of type 'duration'. An extension of time.
"""
VALUE_TYPE = "single value"
TYPE = "duration"
def __init__(self, value: Optional[Union[str, relativedelta, Duration]] = None):
"""
Args:
value: the `duration` value.
"""
if value is None:
value = relativedelta(hours=1)
elif isinstance(value, str):
value = duration_to_relativedelta(value)
elif isinstance(value, Duration):
value = copy(value._value)
elif isinstance(value, relativedelta):
value = value.normalized()
else:
raise ParameterValueFormatError(f'Could not parse duration from "{value}"')
self._value = value
def __eq__(self, other):
if not isinstance(other, Duration):
return NotImplemented
return self._value == other._value
def __hash__(self):
return hash(self._value)
def __str__(self):
return str(relativedelta_to_duration(self._value))
def value_to_database_data(self) -> str:
"""Returns the 'data' property of this object's database representation.
:meta private:
"""
return relativedelta_to_duration(self._value)
def to_dict(self) -> dict:
return {"data": self.value_to_database_data()}
@property
def value(self) -> relativedelta:
return self._value
ScalarValue = Union[bool, float, str, DateTime, Duration]
class _Indexes(np.ndarray):
"""
A subclass of numpy.ndarray that keeps a lookup dictionary from elements to positions.
Used by methods get_value and set_value of IndexedValue, to avoid something like
position = indexes.index(element)
which might be too slow compared to dictionary lookup.
"""
def __new__(cls, other, dtype=None):
obj = np.asarray(other, dtype=dtype).view(cls)
obj.position_lookup = {index: k for k, index in enumerate(other)}
return obj
def __array_finalize__(self, obj):
if obj is None:
return
# pylint: disable=attribute-defined-outside-init
self.position_lookup = getattr(obj, "position_lookup", {})
def __setitem__(self, position, index):
old_index = self.__getitem__(position)
self.position_lookup[index] = self.position_lookup.pop(old_index, "")
super().__setitem__(position, index)
def __eq__(self, other):
return np.array_equal(self, other)
def __bool__(self):
return np.size(self) != 0
[docs]
class IndexedValue(ParameterValue):
"""
Base for all classes representing indexed parameter values.
"""
DEFAULT_INDEX_NAME = NotImplemented
def __init__(self, values: Sequence[Any], value_type: Optional[Type] = None, index_name: str = ""):
"""
:meta private:
Args:
values: values
value_type: type of values
index_name: a label for the index.
"""
self._value_type = value_type
self._indexes: Optional[_Indexes] = None
self._values: Optional[nptyping.NDArray[Any]] = None
self.values = values
self.index_name = index_name if index_name else self.DEFAULT_INDEX_NAME
def __bool__(self):
# NOTE: Use self.indexes rather than self._indexes, otherwise TimeSeriesFixedResolution gives wrong result
return bool(self.indexes)
def __len__(self):
return len(self.indexes)
@property
[docs]
def indexes(self) -> nptyping.NDArray:
"""The indexes."""
return self._indexes
@indexes.setter
def indexes(self, indexes: nptyping.NDArray) -> None:
"""Sets the indexes."""
self._indexes = _Indexes(indexes)
@property
[docs]
def values(self) -> Sequence[Any]:
"""The values."""
return self._values
@values.setter
def values(self, values: Sequence[Any]) -> None:
"""Sets the values."""
if isinstance(self._value_type, np.dtype) and (
not isinstance(values, np.ndarray) or not values.dtype == self._value_type
):
values = np.array(values, dtype=self._value_type)
self._values = values
@property
[docs]
def value_type(self) -> Type:
"""The type of the values."""
return self._value_type
[docs]
def get_nearest(self, index: Any) -> Any:
"""Returns the value at the nearest index to the given one.
Args:
index: The index.
Returns:
The value.
"""
pos = np.searchsorted(self.indexes, index)
return self._values[pos]
[docs]
def get_value(self, index: Any) -> Any:
"""Returns the value at the given index.
Args:
index: The index.
Returns:
The value.
"""
pos = self._indexes.position_lookup.get(index)
if pos is None:
return None
return self._values[pos]
[docs]
def set_value(self, index, value):
"""Sets the value at the given index.
Args:
index (any): The index.
value (any): The value.
"""
pos = self._indexes.position_lookup.get(index)
if pos is not None:
self._values[pos] = value
def to_dict(self):
raise NotImplementedError()
def merge(self, other):
if not isinstance(other, type(self)):
return self
if self.indexes and not isinstance(self.indexes[0], str):
new_indexes = np.unique(np.concatenate((self.indexes, other.indexes)))
else:
# Avoid sorting when indices are arbitrary strings
existing = set(self.indexes)
additional = [x for x in other.indexes if x not in existing]
new_indexes = np.concatenate((additional, self.indexes))
def _merge(value, other):
return other if value is None else merge_parsed(value, other)
new_values = [_merge(self.get_value(index), other.get_value(index)) for index in new_indexes]
self.indexes = new_indexes
self.values = new_values
return self
Value = Union[ScalarValue, IndexedValue]
[docs]
class Array(IndexedValue):
"""A parameter value of type 'array'. A one dimensional array with zero based indexing."""
VALUE_TYPE = "array"
TYPE = "array"
DEFAULT_INDEX_NAME = "i"
def __init__(self, values: Sequence[ScalarValue], value_type: Optional[Type] = None, index_name: str = ""):
"""
Args:
values: the array values.
value_type: the type of the values; if not given, it will be deduced from `values`.
Defaults to float if `values` is empty.
index_name: the name you would give to the array index in your application.
"""
if value_type is None:
value_type = type(values[0]) if values else float
if value_type == int:
value_type = float
try:
values = [value_type(x) for x in values]
except ValueError as error:
raise ParameterValueFormatError("Cannot convert array's values to float.") from error
if not all(isinstance(x, value_type) for x in values):
try:
values = [value_type(x) for x in values]
except ValueError as error:
raise ParameterValueFormatError("Not all array's values are of the same type.") from error
super().__init__(values, value_type=value_type, index_name=index_name)
[docs]
self.indexes = range(len(values))
def __eq__(self, other):
if not isinstance(other, Array):
return NotImplemented
try:
return np.array_equal(self._values, other._values, equal_nan=True) and self.index_name == other.index_name
except TypeError:
return np.array_equal(self._values, other._values) and self.index_name == other.index_name
def to_dict(self):
try:
value_type_id = {
float: FLOAT_VALUE_TYPE,
str: STRING_VALUE_TYPE, # String could also mean time_period but we don't have any way to distinguish that, yet.
DateTime: DateTime.TYPE,
Duration: Duration.TYPE,
}[self._value_type]
except KeyError:
raise ParameterValueFormatError(f"Cannot write unsupported array value type: {self._value_type.__name__}")
if value_type_id in (FLOAT_VALUE_TYPE, STRING_VALUE_TYPE):
data = self._values
else:
data = [x.value_to_database_data() for x in self._values]
value_dict = {"value_type": value_type_id, "data": data}
if self.index_name != self.DEFAULT_INDEX_NAME:
value_dict["index_name"] = self.index_name
return value_dict
class _TimePatternIndexes(_Indexes):
"""An array of *checked* time pattern indexes."""
_VALID_UNITS = re.compile(r"(Y|M|D|WD|h|m|s)")
@staticmethod
def _check_index(union_str: str) -> None:
"""
Checks if a time pattern index has the right format.
Args:
union_str: The time pattern index to check. Generally assumed to be a union of interval intersections.
Raises:
ParameterValueFormatError: If the given string doesn't comply with time pattern spec.
"""
if not union_str:
# We accept empty strings so we can add empty rows in the parameter value editor UI
return
union_dlm = ","
intersection_dlm = ";"
range_dlm = "-"
for intersection_str in union_str.split(union_dlm):
for interval_str in intersection_str.split(intersection_dlm):
m = _TimePatternIndexes._VALID_UNITS.match(interval_str)
if m is None:
raise ParameterValueFormatError(
f"Invalid interval {interval_str}, it should start with either Y, M, D, WD, h, m, or s."
)
key = m.group(0)
lower_upper_str = interval_str[len(key) :]
lower_upper = lower_upper_str.split(range_dlm)
if len(lower_upper) == 1:
value_str = lower_upper[0]
try:
int(value_str)
except Exception as error:
raise ParameterValueFormatError(f"Invalid value {value_str}, must be an integer.") from error
continue
if len(lower_upper) != 2:
raise ParameterValueFormatError(
f"Invalid interval bounds {lower_upper_str}, it should be one integer or two integers separated by dash (-)."
)
lower_str, upper_str = lower_upper
try:
lower = int(lower_str)
except Exception as error:
raise ParameterValueFormatError(f"Invalid lower bound {lower_str}, must be an integer.") from error
try:
upper = int(upper_str)
except Exception as error:
raise ParameterValueFormatError(f"Invalid upper bound {upper_str}, must be an integer.") from error
if lower > upper:
raise ParameterValueFormatError(f"Lower bound {lower} can't be higher than upper bound {upper}.")
def __array_finalize__(self, obj):
"""Checks indexes when building the array."""
if obj is not None:
for x in obj:
self._check_index(x)
super().__array_finalize__(obj)
def __eq__(self, other):
return list(self) == list(other)
def __setitem__(self, position, index):
"""Checks indexes when setting and item."""
self._check_index(index)
super().__setitem__(position, index)
[docs]
class TimePattern(IndexedValue):
"""A parameter value of type 'time_pattern'.
A mapping from time patterns strings to numerical values.
"""
VALUE_TYPE = "time pattern"
TYPE = "time_pattern"
DEFAULT_INDEX_NAME = "p"
def __init__(self, indexes: list[str], values: Sequence[float], index_name: str = ""):
"""
Args:
indexes: the time pattern strings.
values: the values associated to different patterns.
index_name: index name
"""
if len(indexes) != len(values):
raise ParameterValueFormatError("Length of values does not match length of indexes")
if not indexes:
raise ParameterValueFormatError("Empty time pattern not allowed")
super().__init__(values, value_type=np.dtype(float), index_name=index_name)
def __eq__(self, other):
if not isinstance(other, TimePattern):
return NotImplemented
return (
self._indexes == other._indexes
and np.all(self._values == other._values)
and self.index_name == other.index_name
)
@IndexedValue.indexes.setter
def indexes(self, indexes):
self._indexes = _TimePatternIndexes(indexes, dtype=np.object_)
def to_dict(self):
value_dict = {"data": dict(zip(self._indexes, self._values))}
if self.index_name != self.DEFAULT_INDEX_NAME:
value_dict["index_name"] = self.index_name
return value_dict
[docs]
class TimeSeries(IndexedValue):
"""Base for all classes representing 'time_series' parameter values."""
VALUE_TYPE = "time series"
TYPE = "time_series"
DEFAULT_INDEX_NAME = "t"
def __init__(self, values: Sequence[float], ignore_year: bool, repeat: bool, index_name: str = ""):
"""
:meta private:
Args:
values (Sequence): the values in the time-series.
ignore_year (bool): True if the year should be ignored.
repeat (bool): True if the series is repeating.
index_name (str): index name.
"""
if len(values) < 1:
raise ParameterValueFormatError("Time series too short. Must have one or more values")
super().__init__(values, value_type=np.dtype(float), index_name=index_name)
self._ignore_year = ignore_year
self._repeat = repeat
def __len__(self):
return len(self._values)
@property
[docs]
def ignore_year(self):
"""Whether the year should be ignored.
Returns:
bool:
"""
return self._ignore_year
@ignore_year.setter
def ignore_year(self, ignore_year):
"""Sets the ignore_year property.
Args:
bool: new value.
"""
self._ignore_year = bool(ignore_year)
@property
[docs]
def repeat(self):
"""Whether the series is repeating.
Returns:
bool:
"""
return self._repeat
@repeat.setter
def repeat(self, repeat):
"""Sets the repeat property.
Args:
bool: new value.
"""
self._repeat = bool(repeat)
def to_dict(self):
raise NotImplementedError()
[docs]
class TimeSeriesFixedResolution(TimeSeries):
"""
A parameter value of type 'time_series'.
A mapping from time stamps to numerical values, with fixed durations between the time stamps.
When getting the indexes the durations are applied cyclically.
Currently, there is no support for the `ignore_year` and `repeat` options
other than having getters for their values.
"""
_memoized_indexes: dict[tuple[np.datetime64, tuple[relativedelta, ...], int], nptyping.NDArray[np.datetime64]] = {}
def __init__(
self,
start: Union[str, datetime, np.datetime64],
resolution: Union[str, relativedelta, list[Union[str, relativedelta]]],
values: Sequence[float],
ignore_year: bool,
repeat: bool,
index_name: str = "",
):
"""
Args:
start: the first time stamp
resolution: duration(s) between the time stamps.
values: the values in the time-series.
ignore_year: True if the year should be ignored.
repeat: True if the series is repeating.
index_name: index name.
"""
super().__init__(values, ignore_year, repeat, index_name)
self._start = None
self._resolution = None
self.start = start
self.resolution = resolution
def __eq__(self, other):
if not isinstance(other, TimeSeriesFixedResolution):
return NotImplemented
return (
self._start == other._start
and self._resolution == other._resolution
and np.array_equal(self._values, other._values, equal_nan=True)
and self._ignore_year == other._ignore_year
and self._repeat == other._repeat
and self.index_name == other.index_name
)
def _get_memoized_indexes(self) -> nptyping.NDArray[np.datetime64]:
key = (self.start, tuple(self.resolution), len(self))
memoized_indexes = self._memoized_indexes.get(key)
if memoized_indexes is not None:
return memoized_indexes
cycle_count = -(-len(self) // len(self.resolution))
resolution = (cycle_count * self.resolution)[: len(self) - 1]
resolution.insert(0, self._start)
resolution_arr = np.array(resolution)
memoized_indexes = self._memoized_indexes[key] = resolution_arr.cumsum().astype(NUMPY_DATETIME_DTYPE)
return memoized_indexes
@property
[docs]
def indexes(self):
if self._indexes is None:
self.indexes = self._get_memoized_indexes()
return IndexedValue.indexes.fget(self)
@indexes.setter
def indexes(self, indexes):
# Needed because we redefine the setter
self._indexes = _Indexes(indexes)
@property
[docs]
def start(self) -> np.datetime64:
"""Returns the start index."""
return self._start
@start.setter
def start(self, start: Union[str, datetime, np.datetime64]):
"""Sets the start index."""
if isinstance(start, str):
try:
self._start = datetime.fromisoformat(start)
except ValueError:
try:
self._start = dateutil.parser.parse(start)
except ValueError as error:
raise ParameterValueFormatError(f'Cannot parse start time "{start}"') from error
elif isinstance(start, np.datetime64):
self._start = start.tolist()
else:
self._start = start
self._indexes = None
@property
[docs]
def resolution(self) -> list[relativedelta]:
"""Returns the resolution as list of durations."""
return self._resolution
@resolution.setter
def resolution(self, resolution: Union[str, relativedelta, Sequence[Union[str, relativedelta]]]) -> None:
"""Sets the resolution."""
if isinstance(resolution, str):
resolution = [duration_to_relativedelta(resolution)]
elif not isinstance(resolution, Sequence):
resolution = [resolution]
else:
for i, r in enumerate(resolution):
if isinstance(r, str):
resolution[i] = duration_to_relativedelta(r)
if not resolution:
raise ParameterValueFormatError("Resolution cannot be zero.")
self._resolution = resolution
self._indexes = None
def to_dict(self):
if len(self._resolution) > 1:
resolution_as_json = [relativedelta_to_duration(step) for step in self._resolution]
else:
resolution_as_json = relativedelta_to_duration(self._resolution[0])
value_dict = {
"index": {
"start": str(self._start),
"resolution": resolution_as_json,
"ignore_year": self._ignore_year,
"repeat": self._repeat,
},
"data": self._values.tolist(),
}
if self.index_name != self.DEFAULT_INDEX_NAME:
value_dict["index_name"] = self.index_name
return value_dict
[docs]
def get_value(self, index: np.datetime64) -> Optional[np.float64]:
"""Returns the value at the given time stamp."""
pos = self.indexes.position_lookup.get(index)
if pos is None:
return None
return self._values[pos]
[docs]
def set_value(self, index: np.datetime64, value: float) -> None:
"""Sets the value at the given index."""
pos = self.indexes.position_lookup.get(index)
if pos is not None:
self._values[pos] = value
[docs]
class TimeSeriesVariableResolution(TimeSeries):
"""A parameter value of type 'time_series'.
A mapping from time stamps to numerical values with arbitrary time steps.
"""
def __init__(
self,
indexes: Sequence[Union[str, datetime, np.datetime64, DateTime]],
values: Sequence[float],
ignore_year: bool,
repeat: bool,
index_name: str = "",
):
"""
Args:
indexes: the time stamps.
values: the value for each time stamp.
ignore_year: True if the year should be ignored.
repeat: True if the series is repeating.
index_name: index name.
"""
super().__init__(values, ignore_year, repeat, index_name)
if len(indexes) != len(values):
raise ParameterValueFormatError("Length of values does not match length of indexes")
if not isinstance(indexes, np.ndarray):
date_times = np.empty(len(indexes), dtype=NUMPY_DATETIME_DTYPE)
for i, index in enumerate(indexes):
if isinstance(index, DateTime):
date_times[i] = np.datetime64(index.value, NUMPY_DATETIME64_UNIT)
else:
try:
date_times[i] = np.datetime64(index, NUMPY_DATETIME64_UNIT)
except ValueError as error:
raise ParameterValueFormatError(
f'Cannot convert "{index}" of type {type(index).__name__} to time stamp.'
) from error
indexes = date_times
def __eq__(self, other):
if not isinstance(other, TimeSeriesVariableResolution):
return NotImplemented
return (
np.array_equal(self._indexes, other._indexes)
and np.array_equal(self._values, other._values, equal_nan=True)
and self._ignore_year == other._ignore_year
and self._repeat == other._repeat
and self.index_name == other.index_name
)
def to_dict(self):
value_dict = {}
value_dict["data"] = {str(index): float(value) for index, value in zip(self._indexes, self._values)}
# Add "index" entry only if its contents are not set to their default values.
if self._ignore_year:
value_dict.setdefault("index", {})["ignore_year"] = self._ignore_year
if self._repeat:
value_dict.setdefault("index", {})["repeat"] = self._repeat
if self.index_name != self.DEFAULT_INDEX_NAME:
value_dict["index_name"] = self.index_name
return value_dict
[docs]
class Map(IndexedValue):
"""A parameter value of type 'map'. A mapping from key to value, where the values can be other instances
of :class:`ParameterValue`.
"""
VALUE_TYPE = "map"
TYPE = "map"
DEFAULT_INDEX_NAME = "x"
def __init__(
self,
indexes: Sequence[MapIndex],
values: Sequence[Value],
index_type: Optional[Type] = None,
index_name: str = "",
):
"""
Args:
indexes: the indexes in the map.
values: the value for each index.
index_type: index type or None to deduce from ``indexes``.
index_name: index name.
"""
if not indexes and index_type is None:
raise ParameterValueFormatError("Cannot deduce index type from empty indexes list.")
if indexes and index_type is not None and not isinstance(indexes[0], index_type):
raise ParameterValueFormatError('Type of index does not match "index_type" argument.')
if len(indexes) != len(values):
raise ParameterValueFormatError("Length of values does not match length of indexes")
super().__init__(values, index_name=index_name)
self._index_type = index_type if index_type is not None else type(indexes[0])
self._values = values
def __eq__(self, other):
if not isinstance(other, Map):
return NotImplemented
return other._indexes == self._indexes and other._values == self._values and self.index_name == other.index_name
@property
def index_type(self) -> MapIndexType:
return self._index_type
[docs]
def is_nested(self) -> bool:
"""Whether any of the values is also a map."""
return any(isinstance(value, Map) for value in self._values)
[docs]
def value_to_database_data(self) -> tuple[list[list[Union[float, str]]], int]:
"""Returns map's database representation's 'data' dictionary."""
data = []
nested_ranks = [0]
for index, value in zip(self._indexes, self._values):
index_in_db = _map_index_to_database(index)
value_in_db, nested_rank = _map_value_to_database(value)
nested_ranks.append(nested_rank)
data.append([index_in_db, value_in_db])
return data, max(nested_ranks)
def to_dict(self):
data, nested_rank = self.value_to_database_data()
value_dict = {
"index_type": _map_index_type_to_database(self._index_type),
"rank": nested_rank + 1,
"data": data,
}
if self.index_name != self.DEFAULT_INDEX_NAME:
value_dict["index_name"] = self.index_name
return value_dict
MapIndex = Union[float, str, DateTime, Duration]
MapIndexType = Union[Type[float], Type[str], Type[DateTime], Type[Duration]]
_MAP_INDEX_TYPES = {
STRING_VALUE_TYPE: str,
DateTime.TYPE: DateTime,
Duration.TYPE: Duration,
FLOAT_VALUE_TYPE: float,
}
def map_dimensions(map_: Map) -> int:
"""Counts the dimensions in a map.
:meta private:
Args:
map_: the map to process.
Returns:
number of dimensions
"""
nested = 0
for v in map_.values:
if isinstance(v, Map):
nested = max(nested, map_dimensions(v))
elif isinstance(v, IndexedValue):
nested = max(nested, 1)
return 1 + nested
def convert_leaf_maps_to_specialized_containers(map_: Map) -> IndexedValue:
"""
Converts leafs to specialized containers.
Current conversion rules:
- If the ``index_type`` is a :class:`DateTime` and all ``values`` are float,
then the leaf is converted to a :class:`TimeSeries`.
:meta private:
Args:
map_: a map to process.
Returns:
a new map with leaves converted.
"""
converted_container = _try_convert_to_container(map_)
if converted_container is not None:
return converted_container
new_values = []
for _, value in zip(map_.indexes, map_.values):
if isinstance(value, Map):
converted = convert_leaf_maps_to_specialized_containers(value)
new_values.append(converted)
else:
new_values.append(value)
return Map(map_.indexes, new_values, index_name=map_.index_name)
def convert_containers_to_maps(value: Any) -> Union[Any, Map]:
"""
Converts indexed values into maps.
If ``value`` is a :class:`Map` then converts leaf values into maps recursively.
:meta private:
Args:
value: a value to convert.
Returns:
converted Map or value as-is if conversion was not possible
"""
if isinstance(value, Map):
if not value:
return value
new_values = []
for _, x in zip(value.indexes, value.values):
if isinstance(x, IndexedValue):
new_values.append(convert_containers_to_maps(x))
else:
new_values.append(x)
return Map(list(value.indexes), new_values, index_name=value.index_name)
if isinstance(value, IndexedValue):
if not value:
if isinstance(value, TimeSeries):
return Map([], [], DateTime, index_name=TimeSeries.DEFAULT_INDEX_NAME)
return Map([], [], str)
if isinstance(value, TimeSeries):
return Map(
[DateTime(t) for t in np.datetime_as_string(value.indexes)],
list(value.values),
index_name=value.index_name,
)
return Map(list(value.indexes), list(value.values), index_name=value.index_name)
return value
def convert_map_to_table(
map_: Map, make_square: bool = True, row_this_far: Optional[list] = None, empty: Optional[Any] = None
) -> list[list]:
"""
Converts :class:`Map` into list of rows recursively.
:meta private:
Args:
map_: map to convert.
make_square: if True, then pad rows with None so they all have the same length.
row_this_far: current row; used for recursion.
empty: object to fill empty cells with.
Returns:
map's rows
"""
if row_this_far is None:
row_this_far = []
rows = []
for index, value in zip(map_.indexes, map_.values):
if not isinstance(value, Map):
rows.append(row_this_far + [index, value])
else:
rows += convert_map_to_table(value, False, row_this_far + [index])
if make_square:
max_length = 0
for row in rows:
max_length = max(max_length, len(row))
equal_length_rows = []
for row in rows:
equal_length_row = row + (max_length - len(row)) * [empty]
equal_length_rows.append(equal_length_row)
return equal_length_rows
return rows
[docs]
def convert_map_to_dict(map_: Map) -> dict:
"""Converts a :class:`Map` to a nested dictionary."""
d = {}
for index, x in zip(map_.indexes, map_.values):
if isinstance(x, Map):
x = convert_map_to_dict(x)
d[index] = x
return d
def _try_convert_to_container(map_: Map) -> Optional[TimeSeriesVariableResolution]:
"""
Tries to convert a map to corresponding specialized container.
Args:
map_: a map to convert
Returns:
converted Map or None if the map couldn't be converted
"""
if not map_:
return None
stamps = []
values = []
for index, value in zip(map_.indexes, map_.values):
if not isinstance(index, DateTime) or not isinstance(value, float):
return None
stamps.append(index)
values.append(value)
return TimeSeriesVariableResolution(stamps, values, False, False, index_name=map_.index_name)
# Value types that are supported by spinedb_api
VALUE_TYPES: set[str] = {
FLOAT_VALUE_TYPE,
BOOLEAN_VALUE_TYPE,
STRING_VALUE_TYPE,
Duration.TYPE,
DateTime.TYPE,
Array.TYPE,
TimePattern.TYPE,
TimeSeries.TYPE,
Map.TYPE,
}
RANK_1_TYPES: set[str] = {Array.TYPE, TimePattern.TYPE, TimeSeries.TYPE}
NON_ZERO_RANK_TYPES: set[str] = RANK_1_TYPES | {Map.TYPE}
def type_and_rank_to_fancy_type(value_type: str, rank: int) -> str:
if value_type == Map.TYPE:
return f"{rank}d_{value_type}"
return value_type
def fancy_type_to_type_and_rank(fancy_type: str) -> tuple[str, int]:
if fancy_type.endswith(f"d_{Map.TYPE}"):
return Map.TYPE, int("".join(takewhile(lambda x: x.isdigit(), fancy_type)))
if fancy_type in RANK_1_TYPES:
return fancy_type, 1
return fancy_type, 0
def join_value_and_type(db_value: bytes, db_type: Optional[str]) -> str:
"""Joins database value and type into a string.
The resulting string is a JSON string.
In case of complex types (duration, date_time, time_series, time_pattern, array, map),
the type is just added as top-level key.
:meta private:
Args:
db_value: database value
db_type: value type
Returns:
parameter value as JSON with an additional ``type`` field.
"""
try:
parsed = load_db_value(db_value, db_type)
except ParameterValueFormatError:
parsed = None
return json.dumps(parsed)
def split_value_and_type(value_and_type: str) -> tuple[bytes, str]:
"""Splits the given string into value and type.
:meta private:
Args:
value_and_type: a string joining value and type, as obtained by calling :func:`join_value_and_type`.
Returns:
database value and type.
"""
try:
parsed = json.loads(value_and_type)
except (TypeError, json.JSONDecodeError):
parsed = value_and_type
return dump_db_value(parsed)
def deep_copy_value(value: Optional[Value]) -> Optional[Value]:
"""Copies a value.
The operation is deep meaning that nested Maps will be copied as well.
:meta private:
Args:
value to copy
Returns:
deep-copied value
"""
if isinstance(value, (SupportsFloat, str)) or value is None:
return value
if isinstance(value, Array):
return Array(value.values, value.value_type, value.index_name)
if isinstance(value, DateTime):
return DateTime(value)
if isinstance(value, Duration):
return Duration(value)
if isinstance(value, Map):
return deep_copy_map(value)
if isinstance(value, TimePattern):
return TimePattern(value.indexes.copy(), value.values.copy(), value.index_name)
if isinstance(value, TimeSeriesFixedResolution):
return TimeSeriesFixedResolution(
value.start, value.resolution, value.values.copy(), value.ignore_year, value.repeat, value.index_name
)
if isinstance(value, TimeSeriesVariableResolution):
return TimeSeriesVariableResolution(
value.indexes.copy(), value.values.copy(), value.ignore_year, value.repeat, value.index_name
)
raise ValueError("unknown value")
def deep_copy_map(value: Map) -> Map:
"""Deep copies a Map value.
:meta private:
Args:
value: Map to copy
Returns:
deep-copied Map
"""
xs = value.indexes.copy()
ys = [deep_copy_value(y) for y in value.values]
return Map(xs, ys, index_type=value.index_type, index_name=value.index_name)
[docs]
def type_for_value(value: Value) -> tuple[str, int]:
"""Declares value's database type and rank.
Args:
value: value to inspect
Returns:
type and rank
"""
if isinstance(value, Map):
return Map.TYPE, map_dimensions(value)
if isinstance(value, ParameterValue):
if value.TYPE in RANK_1_TYPES:
return value.TYPE, 1
return value.TYPE, 0
return type_for_scalar(value), 0
[docs]
def type_for_scalar(parsed_value: JSONValue) -> Optional[str]:
"""Declares scalar value's database type.
Args:
parsed_value : parsed scalar
Returns:
value's type
"""
if parsed_value is None:
return None
if isinstance(parsed_value, dict):
return parsed_value["type"]
if isinstance(parsed_value, bool):
return BOOLEAN_VALUE_TYPE
if isinstance(parsed_value, SupportsFloat):
return FLOAT_VALUE_TYPE
if isinstance(parsed_value, str):
return STRING_VALUE_TYPE
raise ParameterValueFormatError(f"Values of type {type(parsed_value).__name__} not supported.")
UNPARSED_NULL_VALUE: bytes = to_database(None)[0]