Source code for hubdata.connect_hub

import json
from pathlib import Path

import pyarrow as pa
import pyarrow.dataset as ds
import structlog
from pyarrow import fs

from hubdata.create_hub_schema import create_hub_schema

logger = structlog.get_logger()


[docs] def connect_hub(hub_path: str | Path): """ The main entry point for connecting to a hub, providing access to the instance variables documented in `HubConnection`, including admin.json and tasks.json as dicts. It also allows connecting to data in the hub's model output directory for querying and filtering across all model files. The hub can be located in a local file system or in the cloud on AWS or GCS. Note: Calls `create_hub_schema()` to get the schema to use when calling `HubConnection.get_dataset()`. See: https://docs.hubverse.io/en/latest/user-guide/hub-structure.html for details on how hubs directories are laid out. :param hub_path: str (for local file system hubs or cloud based ones) or Path (local file systems only) pointing to a hub's root directory. it is passed to https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.from_uri From that page: Recognized URI schemes are “file”, “mock”, “s3fs”, “gs”, “gcs”, “hdfs” and “viewfs”. In addition, the argument can be a local path, either a pathlib.Path object or a str. NB: Passing a local path as a str requires an ABSOLUTE path, but passing the hub as a Path can be a relative path. :return: a HubConnection :raise: RuntimeError if `hub_path` is invalid """ return HubConnection(hub_path)
[docs] class HubConnection: """ Provides convenient access to various parts of a hub's `tasks.json` file. Use the `connect_hub` function to create instances of this class, rather than by direct instantiation Instance variables: - hub_path: str pointing to a hub's root directory as passed to `connect_hub()` - schema: the pa.Schema for `HubConnection.get_dataset()`. created by the constructor via `create_hub_schema()` - admin: the hub's `admin.json` contents as a dict - tasks: "" `tasks.json` "" - model_output_dir: Path to the hub's model output directory """ def __init__(self, hub_path: str | Path): """ :param hub_path: str or Path pointing to a hub's root directory as passed to `connect_hub()` """ # set self.hub_path and then get an arrow FileSystem for it, letting it decide the correct subclass based on # that arg, catching any errors. also set two internal instance variables used by HubConnection.get_dataset(): # self._filesystem and self._filesystem_path self.hub_path: str | Path = hub_path try: self._filesystem, self._filesystem_path = fs.FileSystem.from_uri(self.hub_path) except Exception: raise RuntimeError(f'invalid hub_path: {self.hub_path}') # set self.admin and self.tasks, checking for existence try: with self._filesystem.open_input_file(f'{self._filesystem_path}/hub-config/admin.json') as admin_fp, \ self._filesystem.open_input_file(f'{self._filesystem_path}/hub-config/tasks.json') as tasks_fp: self.admin = json.load(admin_fp) self.tasks = json.load(tasks_fp) except Exception as ex: raise RuntimeError(f'admin.json or tasks.json not found: {ex}') # set schema self.schema = create_hub_schema(self.tasks) # set self.model_output_dir, first checking for directory existence model_output_dir_name = self.admin['model_output_dir'] if 'model_output_dir' in self.admin else 'model-output' model_output_dir = f'{self._filesystem_path}/{model_output_dir_name}' if self._filesystem.get_file_info(model_output_dir).type == fs.FileType.NotFound: logger.warn(f'model_output_dir not found: {model_output_dir!r}') self.model_output_dir = model_output_dir
[docs] def get_dataset(self) -> ds: """ :return: a pyarrow.dataset.Dataset for my model_output_dir """ # create the dataset. NB: we are using dataset "directory partitioning" to automatically get the `model_id` # column from directory names # NB: we force file_formats to .parquet if not a LocalFileSystem (e.g., an S3FileSystem). otherwise we use the # list from self.admin['file_format'] file_formats = ['parquet'] if not isinstance(self._filesystem, fs.LocalFileSystem) \ else self.admin['file_format'] schema = create_hub_schema(self.tasks) datasets = [ds.dataset(self.model_output_dir, filesystem=self._filesystem, format=file_format, partitioning=['model_id'], # NB: hard-coded partitioning! exclude_invalid_files=True, schema=schema) for file_format in file_formats] datasets = [dataset for dataset in datasets if len(dataset.files) != 0] if len(datasets) == 1: return datasets[0] else: return ds.dataset([dataset for dataset in datasets if isinstance(dataset, pa.dataset.FileSystemDataset) and (len(dataset.files) != 0)])
[docs] def to_table(self, *args, **kwargs) -> pa.Table: """ A helper function that simply passes args and kwargs to `pyarrow.dataset.Dataset.to_table()`, returning the `pyarrow.Table`. """ return self.get_dataset().to_table(*args, **kwargs)