import json
import os
from dataclasses import dataclass
from typing import Any, Callable, Mapping, Optional, Sequence, Union
import yaml
ParameterValue = Union[int, float, str]
[docs]
@dataclass
class Parameter:
"""Pipeline parameter.
A class that represents a single pipeline parameter.
Attributes:
name: A name of the parameter.
type: A type function of the parameter.
default: A default value of the parameter.
"""
name: str
type: Callable
default: Optional[ParameterValue] = None
[docs]
@dataclass
class Pipeline:
"""Pipeline.
A class that summarizes pipeline information.
Attributes:
name: A name of the pipeline.
parameters: A sequence of the pipeline parameters.
"""
name: str
parameters: Sequence[Parameter]
spec: Mapping[str, Any]
def _type_function_from_name(type_name: str) -> Callable:
if type_name in {"Integer", "INT"}:
type_function = int
elif type_name in {"Float", "DOUBLE"}:
type_function = float
else:
type_function = str
return type_function
def _actual_parameter_value(key: str, value: ParameterValue) -> ParameterValue:
if key in {"Integer", "intValue"}:
actual_value = int(value)
elif key in {"Float", "doubleValue"}:
actual_value = float(value)
else:
actual_value = str(value)
return actual_value
def _create_pipeline(pipeline_spec: Mapping[str, Any]) -> Pipeline:
pipeline_name = pipeline_spec["pipelineSpec"]["pipelineInfo"]["name"]
input_definitions = (
pipeline_spec["pipelineSpec"]["root"]
.get("inputDefinitions", {})
.get("parameters", {})
)
runtime_config = pipeline_spec["runtimeConfig"].get("parameters", {})
parameters = []
for parameter_name, parameter_type in input_definitions.items():
parameter = Parameter(
name=parameter_name, type=_type_function_from_name(parameter_type["type"])
)
if runtime_config.get(parameter_name):
key, value = list(runtime_config[parameter_name].items())[0]
parameter.default = _actual_parameter_value(key, value)
parameters.append(parameter)
return Pipeline(name=pipeline_name, parameters=parameters, spec=pipeline_spec)
def _create_v1_pipeline(pipeline_spec: Mapping[str, Any]) -> Pipeline:
spec_json_str = pipeline_spec["metadata"]["annotations"][
"pipelines.kubeflow.org/pipeline_spec"
]
spec_json = json.loads(spec_json_str)
pipeline_name = spec_json["name"]
parameters = []
for item in spec_json["inputs"]:
if item["name"] in {"pipeline-root", "pipeline-name"}:
continue
parameter = Parameter(
name=item["name"], type=_type_function_from_name(item["type"])
)
if item.get("default") is not None:
parameter.default = _actual_parameter_value(item["type"], item["default"])
parameters.append(parameter)
return Pipeline(name=pipeline_name, parameters=parameters, spec=pipeline_spec)
[docs]
def parse_pipeline_package(filepath: Union[str, os.PathLike]) -> Pipeline:
"""Parse the pipeline package file.
Load a :class:`Pipeline` object from a pre-compiled file that represents
the pipeline.
Args:
filepath (Union[str, os.PathLike]): The path of the pre-compiled file that
represents the pipeline.
Raises:
ValueError: If the :attr:`filepath` file has an invalid schema.
Returns:
Pipeline: An object that represents the pipeline.
"""
filepath_str = os.fspath(filepath)
with open(filepath_str, "r") as f:
pipeline_spec = yaml.safe_load(f)
if (
isinstance(pipeline_spec, dict)
and "pipelineSpec" in pipeline_spec
and "root" in pipeline_spec["pipelineSpec"]
and "pipelineInfo" in pipeline_spec["pipelineSpec"]
and "runtimeConfig" in pipeline_spec
):
pipeline = _create_pipeline(pipeline_spec)
elif (
isinstance(pipeline_spec, dict)
and "metadata" in pipeline_spec
and "annotations" in pipeline_spec["metadata"]
and "pipelines.kubeflow.org/pipeline_spec"
in pipeline_spec["metadata"]["annotations"]
):
pipeline = _create_v1_pipeline(pipeline_spec)
else:
raise ValueError(f"invalid schema: {filepath_str}")
return pipeline