import functools
import warnings
from typing import Optional
from kfp.v2 import dsl
[docs]
def override_docstring(docs: Optional[str] = None):
"""Override the docstring of the component.
Args:
docs (Optional[str], optional): The docstring to be overwritten. If None, the
docstring of the original function is adopted. Defaults to None.
Returns:
Callable: A decorator function with the specified docstring.
"""
def _decorator(func):
if docs is not None:
func.__doc__ = docs
elif hasattr(func, "python_func") and func.python_func.__doc__:
func.__doc__ = func.python_func.__doc__
return func
return _decorator
[docs]
def spec(
name: Optional[str] = None,
cpu: Optional[str] = None,
memory: Optional[str] = None,
gpu: Optional[str] = None,
accelerator: Optional[str] = None,
caching: Optional[bool] = None,
):
"""Specify computing resources to be used by the component.
This function is used as decorator. The computing resources that can be specified
are CPU, memory, GPU, and accelerator type. In addition, the display name of the
component and the use of cache can be specified.
Args:
name (Optional[str], optional): Display name for the component. Defaults to
None.
cpu (Optional[str], optional): CPU limit (maximum) for the component. Defaults
to None.
memory (Optional[str], optional): Memory limit (maximum) for the component.
Defaults to None.
gpu (Optional[str], optional): GPU limit (maximum) for the component. Defaults
to None.
accelerator (Optional[str], optional): Accelerator type requirement for the
component. Defaults to None.
caching (Optional[bool], optional): Caching options for this task. Defaults to
None.
Returns:
Callable: A decorator function with specified computing resources.
.. deprecated:: 0.5.0
Use :func:`container_spec`, :func:`display_name` and :func:`caching` decorators
instead.
"""
warnings.warn(
"`spec` decorator is deprecated. Use `container_spec`, `display_name` and "
"`caching` instead.",
DeprecationWarning,
)
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
task: dsl.ContainerOp = func(*args, **kwargs)
if name:
task.set_display_name(name)
if cpu:
task.container.set_cpu_limit(cpu)
if memory:
task.container.set_memory_limit(memory)
if gpu:
task.container.set_gpu_limit(gpu)
if accelerator:
task.add_node_selector_constraint(
"cloud.google.com/gke-accelerator", accelerator
)
if caching is not None:
task.set_caching_options(caching)
return task
return wrapper
return decorator
[docs]
def container_spec(
cpu: Optional[str] = None,
memory: Optional[str] = None,
gpu: Optional[str] = None,
accelerator: Optional[str] = None,
):
"""Specify computing resources to be used by the component.
The computing resources that can be specified are CPU, memory, GPU, and accelerator
type.
Args:
cpu (Optional[str], optional): CPU limit (maximum) for the component. Defaults
to None.
memory (Optional[str], optional): Memory limit (maximum) for the component.
Defaults to None.
gpu (Optional[str], optional): GPU limit (maximum) for the component. Defaults
to None.
accelerator (Optional[str], optional): Accelerator type requirement for the
component. Defaults to None.
Returns:
Callable: A decorator function with specified computing resources.
"""
def _decorator(func):
@functools.wraps(func)
def _wrapper(*args, **kwargs):
task: dsl.ContainerOp = func(*args, **kwargs)
if cpu:
task.container.set_cpu_limit(cpu)
if memory:
task.container.set_memory_limit(memory)
if gpu:
task.container.set_gpu_limit(gpu)
if accelerator:
task.add_node_selector_constraint(
"cloud.google.com/gke-accelerator", accelerator
)
return task
return _wrapper
return _decorator
[docs]
def display_name(name: str):
"""Specify a display name of the component.
Args:
name (str): Display name.
Returns:
Callable: A decorator function with specified a display name.
"""
def _decorator(func):
@functools.wraps(func)
def _wrapper(*args, **kwargs):
task: dsl.ContainerOp = func(*args, **kwargs)
task.set_display_name(name)
return task
return _wrapper
return _decorator
[docs]
def caching(enable_caching: bool):
"""Specify a caching option of the component.
Args:
enable_caching (bool): Whether the component uses the cache from a previous run.
Returns:
Callable: A decorator function with specified a caching option.
"""
def _decorator(func):
@functools.wraps(func)
def _wrapper(*args, **kwargs):
task: dsl.ContainerOp = func(*args, **kwargs)
task.set_caching_options(enable_caching)
return task
return _wrapper
return _decorator