Source code for spinedb_api.filters.tools

######################################################################################################################
# Copyright (C) 2017-2022 Spine project consortium
# Copyright Spine Database API contributors
# This file is part of Spine Database API.
# Spine Database API is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser
# General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details. You should have received a copy of the GNU Lesser General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
######################################################################################################################
"""This module contains tools and utilities to work with filters."""

from __future__ import annotations
from collections.abc import Iterable
from itertools import dropwhile, takewhile
from json import dump, load
import sys
from typing import TYPE_CHECKING, Optional, Union
from urllib.parse import parse_qs, urlencode, urlparse
from .alternative_filter import (
    ALTERNATIVE_FILTER_SHORTHAND_TAG,
    ALTERNATIVE_FILTER_TYPE,
    alternative_filter_config,
    alternative_filter_config_to_shorthand,
    alternative_filter_from_dict,
    alternative_filter_shorthand_to_config,
    alternative_names_from_dict,
)
from .execution_filter import (
    EXECUTION_FILTER_TYPE,
    EXECUTION_SHORTHAND_TAG,
    execution_filter_config,
    execution_filter_config_to_shorthand,
    execution_filter_from_dict,
    execution_filter_shorthand_to_config,
)
from .renamer import (
    ENTITY_CLASS_RENAMER_SHORTHAND_TAG,
    ENTITY_CLASS_RENAMER_TYPE,
    PARAMETER_RENAMER_SHORTHAND_TAG,
    PARAMETER_RENAMER_TYPE,
    entity_class_renamer_config_to_shorthand,
    entity_class_renamer_from_dict,
    entity_class_renamer_shorthand_to_config,
    parameter_renamer_config_to_shorthand,
    parameter_renamer_from_dict,
    parameter_renamer_shorthand_to_config,
)
from .scenario_filter import (
    SCENARIO_FILTER_TYPE,
    SCENARIO_SHORTHAND_TAG,
    scenario_filter_config,
    scenario_filter_config_to_shorthand,
    scenario_filter_from_dict,
    scenario_filter_shorthand_to_config,
    scenario_name_from_dict,
)
from .value_transformer import (
    VALUE_TRANSFORMER_SHORTHAND_TAG,
    VALUE_TRANSFORMER_TYPE,
    value_transformer_config_to_shorthand,
    value_transformer_from_dict,
    value_transformer_shorthand_to_config,
)

if TYPE_CHECKING:
    from ..db_mapping import DatabaseMapping

FILTER_IDENTIFIER = "spinedbfilter"
SHORTHAND_TAG = "cfg:"


[docs] def apply_filter_stack(db_map: DatabaseMapping, stack: Iterable[dict]) -> None: """Applies filters given as config dicts in ``stack`` to a database map.""" appliers = { ALTERNATIVE_FILTER_TYPE: alternative_filter_from_dict, ENTITY_CLASS_RENAMER_TYPE: entity_class_renamer_from_dict, EXECUTION_FILTER_TYPE: execution_filter_from_dict, PARAMETER_RENAMER_TYPE: parameter_renamer_from_dict, SCENARIO_FILTER_TYPE: scenario_filter_from_dict, VALUE_TRANSFORMER_TYPE: value_transformer_from_dict, } with db_map: for filter_ in stack: appliers[filter_["type"]](db_map, filter_)
[docs] def load_filters(configs): """ Loads filter configurations from disk as needed and constructs a filter stack. Args: configs (list): list of filter config dicts and paths to filter configuration files Returns: list of dict: filter stack """ stack = [] for config in configs: if isinstance(config, str): with open(config, encoding="utf-8") as config_file: stack.append(load(config_file)) else: stack.append(config) return stack
[docs] def store_filter(config, out): """ Writes filter config to an output stream. Args: config (dict): filter config to write out (TextIOBase): a file-like object that supports writing """ dump(config, out)
[docs] def filter_config(filter_type, value): """ Creates a config dict for filter of given type. Args: filter_type (str): the filter type (e.g. "scenario_filter") value (object): the filter value (e.g. scenario name) Returns: dict: filter configuration """ return { SCENARIO_FILTER_TYPE: scenario_filter_config, ALTERNATIVE_FILTER_TYPE: alternative_filter_config, EXECUTION_FILTER_TYPE: execution_filter_config, }[filter_type](value)
[docs] def append_filter_config(url: str, config: Union[dict, str]) -> str: """ Appends a filter config to given url. ``config`` can either be a config dict, a shorthand, or a path to a JSON file that contains the dictionary. """ url = urlparse(url) query = parse_qs(url.query) filters = query.setdefault(FILTER_IDENTIFIER, []) if isinstance(config, dict): config = config_to_shorthand(config) if config not in filters: filters.append(config) url = url._replace(query=urlencode(query, doseq=True)) return _unparse_url_ensuring_correct_slashes(url)
[docs] def filter_configs(url: str) -> list[dict]: """Returns filter config dicts or file paths to config JSON from given URL.""" parsed = urlparse(url) query = parse_qs(parsed.query) try: filters = query[FILTER_IDENTIFIER] except KeyError: return [] parsed_filters = [] for filter_ in filters: if filter_.startswith(SHORTHAND_TAG): parsed_filters.append(_parse_shorthand(filter_[len(SHORTHAND_TAG) :])) else: parsed_filters.append(filter_) return parsed_filters
[docs] def pop_filter_configs(url): """ Pops filter config files and dicts from URL's query part. Args: url (str): a URL Returns: tuple: a list of filter configs and the 'popped from' URL """ parsed = urlparse(url) query = parse_qs(parsed.query) try: filters = query.pop(FILTER_IDENTIFIER) except KeyError: return [], url parsed_filters = [] for filter_ in filters: if filter_.startswith(SHORTHAND_TAG): parsed_filters.append(_parse_shorthand(filter_[len(SHORTHAND_TAG) :])) else: parsed_filters.append(filter_) parsed = parsed._replace(query=urlencode(query, doseq=True)) return parsed_filters, _unparse_url_ensuring_correct_slashes(parsed)
[docs] def clear_filter_configs(url: str) -> str: """Removes filters from given URL returning the cleared URL.""" parsed = urlparse(url) query = parse_qs(parsed.query) try: del query[FILTER_IDENTIFIER] except KeyError: return url parsed = parsed._replace(query=urlencode(query, doseq=True)) return _unparse_url_ensuring_correct_slashes(parsed)
[docs] def ensure_filtering(url, fallback_alternative=None): """ Appends fallback filters to given url if it does not contain corresponding filter already. Args: url (str): database URL fallback_alternative (str, optional): fallback alternative if URL does not contain scenario or alternative filters Returns: str: database URL """ configs = filter_configs(url) stack = load_filters(configs) if fallback_alternative is not None: alternative_found = False for config in stack: scenario = scenario_name_from_dict(config) if scenario is not None: alternative_found = True break alternatives = alternative_names_from_dict(config) if alternatives: alternative_found = True break if not alternative_found: return append_filter_config(url, alternative_filter_config([fallback_alternative])) return url
[docs] def config_to_shorthand(config): """ Converts a filter config dictionary to shorthand. Args: config (dict): filter configuration Returns: str: config shorthand """ shorthands = { ALTERNATIVE_FILTER_TYPE: alternative_filter_config_to_shorthand, ENTITY_CLASS_RENAMER_TYPE: entity_class_renamer_config_to_shorthand, PARAMETER_RENAMER_TYPE: parameter_renamer_config_to_shorthand, SCENARIO_FILTER_TYPE: scenario_filter_config_to_shorthand, EXECUTION_FILTER_TYPE: execution_filter_config_to_shorthand, VALUE_TRANSFORMER_TYPE: value_transformer_config_to_shorthand, } return SHORTHAND_TAG + shorthands[config["type"]](config)
def is_modifying_filter(filter_config: dict) -> bool: return filter_config["type"] != EXECUTION_FILTER_TYPE def _parse_shorthand(shorthand): """ Converts shorthand filter config into configuration dictionary. Args: shorthand (str): a shorthand config string Returns: dict: filter configuration dictionary """ shorthand_parsers = { ALTERNATIVE_FILTER_SHORTHAND_TAG: alternative_filter_shorthand_to_config, ENTITY_CLASS_RENAMER_SHORTHAND_TAG: entity_class_renamer_shorthand_to_config, PARAMETER_RENAMER_SHORTHAND_TAG: parameter_renamer_shorthand_to_config, SCENARIO_SHORTHAND_TAG: scenario_filter_shorthand_to_config, EXECUTION_SHORTHAND_TAG: execution_filter_shorthand_to_config, VALUE_TRANSFORMER_SHORTHAND_TAG: value_transformer_shorthand_to_config, } tag, _, _ = shorthand.partition(":") return shorthand_parsers[tag](shorthand)
[docs] def name_from_dict(config: dict) -> Optional[str]: """Returns scenario name from scenario filter config dict or None for other filter types.""" func = {SCENARIO_FILTER_TYPE: scenario_name_from_dict}.get(config["type"]) if func is None: return None return func(config)
def _unparse_url_ensuring_correct_slashes(url): """ :meta private: Converts URL tuple into string ensuring SqlAlchemy compatible format for SQLite URLs. Args: url (NamedTuple): URL tuple Returns: str: URL as string """ url_string = url.geturl() if not url.hostname: slash_count = len(list(takewhile(lambda c: c == "/", dropwhile(lambda c: c != "/", url_string)))) if slash_count != 3: scheme, _, rest = url_string.partition(slash_count * "/" or "//") slashes = ("///" if sys.platform == "win32" or "\\" in url.path else "////") if url.path else "//" return scheme + slashes + rest return url_string