Source code for kfp_toolbox.pipeline_jobs

import os
from typing import Any, Mapping, Optional, Union


[docs] def submit_pipeline_job( pipeline_file: Union[str, os.PathLike], endpoint: Optional[str] = None, iap_client_id: Optional[str] = None, api_namespace: str = "kubeflow", other_client_id: Optional[str] = None, other_client_secret: Optional[str] = None, arguments: Optional[Mapping[str, Any]] = None, run_name: Optional[str] = None, experiment_name: Optional[str] = None, namespace: Optional[str] = None, pipeline_root: Optional[str] = None, enable_caching: Optional[bool] = None, service_account: Optional[str] = None, encryption_spec_key_name: Optional[str] = None, labels: Optional[Mapping[str, str]] = None, project: Optional[str] = None, location: Optional[str] = None, network: Optional[str] = None, ): """Submit a pipeline job. Submit a pipeline job to the appropriate environment. If an :attr:`endpoint` is specified, it is considered an instance of Kubeflow Pipelines. Otherwise, attempt to submit to Vertex AI Pipelines. Args: pipeline_file (Union[str, os.PathLike]): Path of the pipeline package file. endpoint (Optional[str], optional): Endpoint of the KFP API service to connect. Used only for Kubeflow Pipelines. Defaults to None. iap_client_id (Optional[str], optional): The client ID used by Identity-Aware Proxy. Used only for Kubeflow Pipelines. Defaults to None. api_namespace (str, optional): Kubernetes namespace to connect to the KFP API. Used only for Kubeflow Pipelines. Defaults to "kubeflow". other_client_id (Optional[str], optional): The client ID used to obtain the auth codes and refresh tokens. Used only for Kubeflow Pipelines. Defaults to None. other_client_secret (Optional[str], optional): The client secret used to obtain the auth codes and refresh tokens. Used only for Kubeflow Pipelines. Defaults to None. arguments (Optional[Mapping[str, Any]], optional): Arguments to the pipeline function provided as a dict. Defaults to None. run_name (Optional[str], optional): Name of the run to be shown in the UI. Defaults to None. experiment_name (Optional[str], optional): Name of the experiment to add the run to. Defaults to None. namespace (Optional[str], optional): Kubernetes namespace where the pipeline runs are created. Used only for Kubeflow Pipelines. Defaults to None. pipeline_root (Optional[str], optional): The root path of the pipeline outputs. Defaults to None. enable_caching (Optional[bool], optional): Whether or not to enable caching for the run. Defaults to None. service_account (Optional[str], optional): Specifies which Kubernetes service account this run uses. Defaults to None. encryption_spec_key_name (Optional[str], optional): The Cloud KMS resource identifier of the customer managed encryption key used to protect the job. Used only for Vertex AI Pipelines. Defaults to None. labels (Optional[Mapping[str, str]], optional): The user defined metadata to organize PipelineJob. Used only for Vertex AI Pipelines. Defaults to None. project (Optional[str], optional): The project that you want to run this PipelineJob in. Used only for Vertex AI Pipelines. Defaults to None. location (Optional[str], optional): Location to create PipelineJob. Used only for Vertex AI Pipelines. Defaults to None. network (Optional[str], optional): The full name of the Compute Engine network to which the job should be peered. Used only for Vertex AI Pipelines. Defaults to None. """ pipeline_file_str = os.fspath(pipeline_file) if endpoint: # Kubeflow Pipelines import kfp client = kfp.Client( host=endpoint, client_id=iap_client_id, namespace=api_namespace, other_client_id=other_client_id, other_client_secret=other_client_secret, ) client.create_run_from_pipeline_package( pipeline_file=pipeline_file_str, arguments=arguments, # type: ignore run_name=run_name, experiment_name=experiment_name, namespace=namespace, pipeline_root=pipeline_root, enable_caching=enable_caching, service_account=service_account, ) else: # Vertex AI Pipelines from google.cloud import aiplatform job = aiplatform.PipelineJob( display_name=None, # type: ignore # will be generated template_path=pipeline_file_str, job_id=run_name, pipeline_root=pipeline_root, parameter_values=arguments, # type: ignore enable_caching=enable_caching, encryption_spec_key_name=encryption_spec_key_name, labels=labels, # type: ignore project=project, location=location, ) job.submit( service_account=service_account, network=network, experiment=experiment_name )