Source code for kfp_toolbox.pipeline_parser

import json
import os
from dataclasses import dataclass
from typing import Any, Callable, Mapping, Optional, Sequence, Union

import yaml

ParameterValue = Union[int, float, str]


[docs] @dataclass class Parameter: """Pipeline parameter. A class that represents a single pipeline parameter. Attributes: name: A name of the parameter. type: A type function of the parameter. default: A default value of the parameter. """ name: str type: Callable default: Optional[ParameterValue] = None
[docs] @dataclass class Pipeline: """Pipeline. A class that summarizes pipeline information. Attributes: name: A name of the pipeline. parameters: A sequence of the pipeline parameters. """ name: str parameters: Sequence[Parameter] spec: Mapping[str, Any]
def _type_function_from_name(type_name: str) -> Callable: if type_name in {"Integer", "INT"}: type_function = int elif type_name in {"Float", "DOUBLE"}: type_function = float else: type_function = str return type_function def _actual_parameter_value(key: str, value: ParameterValue) -> ParameterValue: if key in {"Integer", "intValue"}: actual_value = int(value) elif key in {"Float", "doubleValue"}: actual_value = float(value) else: actual_value = str(value) return actual_value def _create_pipeline(pipeline_spec: Mapping[str, Any]) -> Pipeline: pipeline_name = pipeline_spec["pipelineSpec"]["pipelineInfo"]["name"] input_definitions = ( pipeline_spec["pipelineSpec"]["root"] .get("inputDefinitions", {}) .get("parameters", {}) ) runtime_config = pipeline_spec["runtimeConfig"].get("parameters", {}) parameters = [] for parameter_name, parameter_type in input_definitions.items(): parameter = Parameter( name=parameter_name, type=_type_function_from_name(parameter_type["type"]) ) if runtime_config.get(parameter_name): key, value = list(runtime_config[parameter_name].items())[0] parameter.default = _actual_parameter_value(key, value) parameters.append(parameter) return Pipeline(name=pipeline_name, parameters=parameters, spec=pipeline_spec) def _create_v1_pipeline(pipeline_spec: Mapping[str, Any]) -> Pipeline: spec_json_str = pipeline_spec["metadata"]["annotations"][ "pipelines.kubeflow.org/pipeline_spec" ] spec_json = json.loads(spec_json_str) pipeline_name = spec_json["name"] parameters = [] for item in spec_json["inputs"]: if item["name"] in {"pipeline-root", "pipeline-name"}: continue parameter = Parameter( name=item["name"], type=_type_function_from_name(item["type"]) ) if item.get("default") is not None: parameter.default = _actual_parameter_value(item["type"], item["default"]) parameters.append(parameter) return Pipeline(name=pipeline_name, parameters=parameters, spec=pipeline_spec)
[docs] def parse_pipeline_package(filepath: Union[str, os.PathLike]) -> Pipeline: """Parse the pipeline package file. Load a :class:`Pipeline` object from a pre-compiled file that represents the pipeline. Args: filepath (Union[str, os.PathLike]): The path of the pre-compiled file that represents the pipeline. Raises: ValueError: If the :attr:`filepath` file has an invalid schema. Returns: Pipeline: An object that represents the pipeline. """ filepath_str = os.fspath(filepath) with open(filepath_str, "r") as f: pipeline_spec = yaml.safe_load(f) if ( isinstance(pipeline_spec, dict) and "pipelineSpec" in pipeline_spec and "root" in pipeline_spec["pipelineSpec"] and "pipelineInfo" in pipeline_spec["pipelineSpec"] and "runtimeConfig" in pipeline_spec ): pipeline = _create_pipeline(pipeline_spec) elif ( isinstance(pipeline_spec, dict) and "metadata" in pipeline_spec and "annotations" in pipeline_spec["metadata"] and "pipelines.kubeflow.org/pipeline_spec" in pipeline_spec["metadata"]["annotations"] ): pipeline = _create_v1_pipeline(pipeline_spec) else: raise ValueError(f"invalid schema: {filepath_str}") return pipeline