Source code for hubdata.create_target_data_schema

import json
from enum import Enum, auto
from pathlib import Path

import pyarrow as pa
import structlog

from hubdata.connect_hub import HubConnection, connect_hub
from hubdata.create_hub_schema import _pa_type_for_hub_type

logger = structlog.get_logger()


class TargetType(Enum):
    TIME_SERIES = auto()  # indicates time-series target data is to be used
    ORACLE_OUTPUT = auto()  # "" oracle-output ""


[docs] def create_target_data_schema(hub_path: str | Path, target_type: TargetType) -> pa.Schema | None: """ Top-level function for creating a time-series target schema or oracle-output target schema for the passed `hub_path`. :param hub_path: str (for local file system hubs or cloud based ones) or Path (local file systems only) pointing to a hub's root directory. It is passed to https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.from_uri From that page: Recognized URI schemes are “file”, “mock”, “s3fs”, “gs”, “gcs”, “hdfs” and “viewfs”. In addition, the argument can be a local path, either a pathlib.Path object or a str. NB: Passing a local path as a str requires an ABSOLUTE path, but passing the hub as a Path can be a relative path. :param target_type: a TargetType specifying the target data schema type :return: a `pyarrow.Schema` for the passed `hub_path` if a `hub-config/target-data.json` file is present. otherwise returns None :raise: RuntimeError if `hub_path` is invalid """ hub_conn = connect_hub(hub_path) target_data = _target_data_json(hub_conn) # try to open hub-config/target-data.json return pa.schema(_col_name_to_pa_type_for_target_data(hub_conn.schema, target_data, target_type == TargetType.TIME_SERIES)) \ if target_data is not None else None
def _target_data_json(hub_conn: HubConnection) -> dict | None: """ Helper that returns the contents of `hub_connection`'s hub-config/target-data.json file if found. Returns None otherwise. :param hub_conn: the hub's HubConnection :return: hub-config/target-data.json file as a dict, or None if not found """ try: with (hub_conn._filesystem.open_input_file(f'{hub_conn._filesystem_path}/hub-config/target-data.json') as fp): return json.load(fp) except Exception: logger.warn('target-data.json not found. using inferred schema from data') return None def _col_name_to_pa_type_for_target_data(hub_schema: pa.Schema, target_data: dict, is_time_series: bool) -> dict[str, pa.DataType]: """ Helper that returns a mapping of `hub-config/target-data.json` column names to pa.DataTypes. :param hub_schema: a hub schema as returned by `create_hub_schema()` :param target_data: as returned by `_target_data_json()` :param is_time_series: True if output is for time-series target data, and False if for oracle-output target data :return: col_name_to_pa_type """ # process target-data.json sections, filling col_name_to_pa_type property_name = 'time-series' if is_time_series else 'oracle-output' col_name_to_pa_type: dict[str, pa.DataType] = {} # top-level property: `observable_unit` (required): task ID column names. get types from regular schema # (tasks.json). can be overridden by target-type specific configuration ts_observable_unit = target_data[property_name]['observable_unit'] \ if (property_name in target_data) and ('observable_unit' in target_data[property_name]) \ else target_data['observable_unit'] for column_name in ts_observable_unit: col_name_to_pa_type[column_name] = hub_schema.field(column_name).type # top-level property: `date_col` (required): date column name. a Date. may or may not be in `observable_unit` date_col = target_data['date_col'] if date_col not in col_name_to_pa_type: col_name_to_pa_type[date_col] = pa.date32() # top-level property: `versioned` (optional): whether all target type datasets are versioned using `as_of` dates. # defaults to False. can be overridden by target-type specific configuration ts_versioned = target_data[property_name]['versioned'] \ if (property_name in target_data) and ('versioned' in target_data[property_name]) \ else (target_data['versioned'] if 'versioned' in target_data else False) if ts_versioned: col_name_to_pa_type['as_of'] = pa.date32() if is_time_series: # time-series specific # target-type specific configuration: `time-series` > `non_task_id_schema` (optional): additional # (column_name:r_data_type) key-value pairs non_task_id_schema = target_data[property_name]['non_task_id_schema'] \ if (property_name in target_data) and ('non_task_id_schema' in target_data[property_name]) \ else {} for column_name, hub_type in non_task_id_schema.items(): col_name_to_pa_type[column_name] = _pa_type_for_hub_type(hub_type) # `observation` column: [implicit to time-series data]: same type as `value` from regular schema (tasks.json) col_name_to_pa_type['observation'] = hub_schema.field('value').type else: # oracle-output specific # target-type specific configuration: `oracle-output` > `has_output_type_ids` (optional): Indicates whether the # oracle-output data have an `output_type` and `output_type_id` column. has_output_type_ids = target_data[property_name]['has_output_type_ids'] \ if (property_name in target_data) and ('has_output_type_ids' in target_data[property_name]) \ else False if has_output_type_ids: col_name_to_pa_type['output_type'] = hub_schema.field('output_type').type col_name_to_pa_type['output_type_id'] = hub_schema.field('output_type_id').type # `oracle_value` column: [implicit to oracle-output data]: same type as `value` from regular schema (tasks.json) col_name_to_pa_type['oracle_value'] = hub_schema.field('value').type # done return col_name_to_pa_type