Building Python Function-based Components

Building your own lightweight pipelines components using the Pipelines SDK v2 and Python

A Kubeflow Pipelines component is a self-contained set of code that performs one step in your ML workflow. A pipeline component is composed of:

  • The component code, which implements the logic needed to perform a step in your ML workflow.

  • A component specification, which defines the following:

    • The component’s metadata, its name and description.
    • The component’s interface, the component’s inputs and outputs.
    • The component’s implementation, the Docker container image to run, how to pass inputs to your component code, and how to get the component’s outputs.

Python function-based components make it easier to iterate quickly by letting you build your component code as a Python function and generating the component specification for you. This document describes how to build Python function-based components and use them in your pipeline.

Note: This guide demonstrates how to build components using the Pipelines SDK v2. Currently, Kubeflow Pipelines v2 is in development. You can use this guide to start building and running pipelines that are compatible with the Pipelines SDK v2.

Learn more about Pipelines SDK v2.

Before you begin

  1. Run the following command to install the Kubeflow Pipelines SDK v1.6.2 or higher. If you run this command in a Jupyter notebook, restart the kernel after installing the SDK.
$ pip install --upgrade kfp
  1. Import the kfp, kfp.dsl, and kfp.v2.dsl packages.
import kfp
import kfp.dsl as dsl
from kfp.v2.dsl import (
    component,
    Input,
    Output,
    Dataset,
    Metrics,
)
  1. Create an instance of the kfp.Client class following steps in connecting to Kubeflow Pipelines using the SDK client.
client = kfp.Client() # change arguments accordingly

For more information about the Kubeflow Pipelines SDK, see the SDK reference guide.

Getting started with Python function-based components

This section demonstrates how to get started building Python function-based components by walking through the process of creating a simple component.

  1. Define your component’s code as a standalone python function. In this example, the function adds two floats and returns the sum of the two arguments. Use the kfp.v2.dsl.component annotation to convert the function into a factory function that you can use to create kfp.dsl.ContainerOp class instances to use as steps in your pipeline.
@component
def add(a: float, b: float) -> float:
  '''Calculates sum of two arguments'''
  return a + b
  1. Create and run your pipeline. Learn more about creating and running pipelines.
import kfp.dsl as dsl
@dsl.pipeline(
  name='addition-pipeline',
  description='An example pipeline that performs addition calculations.',
  pipeline_root='gs://my-pipeline-root/example-pipeline'
)
def add_pipeline(
  a: float=1,
  b: float=7,
):
  # Passes a pipeline parameter and a constant value to the `add` factory
  # function.
  first_add_task = add(a, 4)
  # Passes an output reference from `first_add_task` and a pipeline parameter
  # to the `add` factory function. For operations with a single return
  # value, the output reference can be accessed as `task.output` or
  # `task.outputs['output_name']`.
  second_add_task = add(first_add_task.output, b)

# Specify pipeline argument values
arguments = {'a': 7, 'b': 8}
  1. Compile and run your pipeline. Learn more about compiling and running pipelines.
# Submit a pipeline run using the v2 compatible mode
client.create_run_from_pipeline_func(
    add_pipeline,
    arguments=arguments,
    mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE)

Building Python function-based components

Use the following instructions to build a Python function-based component:

  1. Define a standalone Python function. This function must meet the following requirements:

  2. Kubeflow Pipelines uses your function’s inputs and outputs to define your component’s interface. Learn more about passing data between components. Your function’s inputs and outputs must meet the following requirements:

  3. (Optional.) If your function has complex dependencies, choose or build a container image for your Python function to run in. Learn more about selecting or building your component’s container image.

  4. Add the kfp.v2.dsl.component decorator to convert your function into a pipeline component. You can specify the following arguments to the decorator:

    • base_image: (Optional.) Specify the Docker container image to run this function in. Learn more about selecting or building a container image.
    • output_component_file: (Optional.) Writes your component definition to a file. You can use this file to share the component with colleagues or reuse it in different pipelines.
    • packages_to_install: (Optional.) A list of versioned Python packages to install before running your function.

Using and installing Python packages

When Kubeflow Pipelines runs your pipeline, each component runs within a Docker container image on a Kubernetes Pod. To load the packages that your Python function depends on, one of the following must be true:

  • The package must be installed on the container image.
  • The package must be defined using the packages_to_install parameter of the kfp.v2.dsl.component decorator.
  • Your function must install the package. For example, your function can use the subprocess module to run a command like pip install that installs a package.

Selecting or building a container image

Currently, if you do not specify a container image, your Python-function based component uses the python:3.7 container image. If your function has complex dependencies, you may benefit from using a container image that has your dependencies preinstalled, or building a custom container image. Preinstalling your dependencies reduces the amount of time that your component runs in, since your component does not need to download and install packages each time it runs.

Many frameworks, such as TensorFlow and PyTorch, and cloud service providers offer prebuilt container images that have common dependencies installed.

If a prebuilt container is not available, you can build a custom container image with your Python function’s dependencies. For more information about building a custom container, read the Dockerfile reference guide in the Docker documentation.

If you build or select a container image, instead of using the default container image, the container image must use Python 3.5 or later.

Understanding how data is passed between components

When Kubeflow Pipelines runs your component, a container image is started in a Kubernetes Pod and your component’s inputs are passed in as command-line arguments. When your component has finished, the component’s outputs are returned as files.

Python function-based components make it easier to build pipeline components by building the component specification for you. Python function-based components also handle the complexity of passing inputs into your component and passing your function’s outputs back to your pipeline.

Component inputs and outputs are classified as either parameters or artifacts, depending on their data type.

  • Parameters typically represent settings that affect the behavior of your pipeline. Parameters are passed into your component by value, and can be of any of the following types: int, double, float, or str. Since parameters are passed by value, the quantity of data passed in a parameter must be appropriate to pass as a command-line argument.

  • Artifacts represent large or complex data structures like datasets or models, and are passed into components as a reference to a file path.

    If you have large amounts of string data to pass to your component, such as a JSON file, annotate that input or output as a type of Artifact, such as Dataset, to let Kubeflow Pipelines know to pass this to your component as a file.

    In addition to the artifact’s data, you can also read and write the artifact’s metadata. For output artifacts, you can record metadata as key-value pairs, such as the accuracy of a trained model. For input artifacts, you can read the artifact’s metadata — for example, you could use metadata to decide if a model is accurate enough to deploy for predictions.

All outputs are returned as files, using the the paths that Kubeflow Pipelines provides.

The following sections describe how to pass parameters and artifacts to your function.

Passing parameters by value

Python function-based components make it easier to pass parameters between components by value (such as numbers, booleans, and short strings), by letting you define your component’s interface by annotating your Python function. Parameters can be of any type that is appropriate to pass as a command-line argument, such as int, float, double, or str.

If your component returns multiple outputs by value, annotate your function with the typing.NamedTuple type hint and use the collections.namedtuple function to return your function’s outputs as a new subclass of tuple.

The following example demonstrates how to return multiple outputs by value.

from typing import NamedTuple

@component
def multiple_return_values_example(a: float, b: float) -> NamedTuple(
  'ExampleOutputs',
  [
    ('sum', float),
    ('product', float)
  ]):
  """Example function that demonstrates how to return multiple values."""  
  sum_value = a + b
  product_value = a * b

  from collections import namedtuple
  example_output = namedtuple('ExampleOutputs', ['sum', 'product'])
  return example_output(sum_value, product_value)

Passing artifacts by file

Python function-based components make it easier to pass files to your component, or to return files from your component, by letting you annotate your Python function’s arguments as artifacts. Artifacts represent large or complex data structures like datasets or models, and are passed into components as a reference to a file path.

In addition to the artifact’s data, you can also read and write the artifact’s metadata. For output artifacts, you can record metadata as key-value pairs, such as the accuracy of a trained model. For input artifacts, you can read the artifact’s metadata — for example, you could use metadata to decide if a model is accurate enough to deploy for predictions.

If your artifact is an output file, Kubeflow Pipelines passes your function a path or stream that you can use to store your output file. This path is a location within your pipeline’s pipeline_root that your component can write to.

The following example accepts a file as an input and returns two files as outputs.

@component
def split_text_lines(
    source: Input[Dataset],
    odd_lines: Output[Dataset],
    even_lines_path: Output[Dataset]):
    """Splits a text file into two files, with even lines going to one file
    and odd lines to the other."""

    with open(source.path, 'r') as reader:
        with open(odd_lines.path, 'w') as odd_writer:
            with open(even_lines_path, 'w') as even_writer:
                while True:
                    line = reader.readline()
                    if line == "":
                        break
                    odd_writer.write(line)
                    line = reader.readline()
                    if line == "":
                        break
                    even_writer.write(line)

In this example, the inputs and outputs are defined as arguments of the split_text_lines function. This lets Kubeflow Pipelines pass the path to the source data file and the paths to the output data files into the function.

To accept a file as an input parameter, use one of the following type annotations:

  • kfp.dsl.Input: Use this generic type hint to specify that your function expects this argument to be an Artifact. Your function can use the argument’s path property to get the artifact’s path, and the metadata property to read its key/value metadata.
  • kfp.components.InputBinaryFile: Use this annotation to specify that your function expects an argument to be an io.BytesIO instance that this function can read.
  • kfp.components.InputPath: Use this annotation to specify that your function expects an argument to be the path to the input file as a string.
  • kfp.components.InputTextFile: Use this annotation to specify that your function expects an argument to be an io.TextIOWrapper instance that this function can read.

To return a file as an output, use one of the following type annotations:

  • kfp.dsl.Output: Use this generic type hin to specify that your function expects this argument to be an Artifact. Your function can use the argument’s path property to get the artifact path to write to, and the metadata property to log key/value metadata.
  • kfp.components.OutputBinaryFile: Use this annotation to specify that your function expects an argument to be an io.BytesIO instance that this function can write to.
  • kfp.components.OutputPath: Use this annotation to specify that your function expects an argument to be the path to store the output file at as a string.
  • kfp.components.OutputTextFile: Use this annotation to specify that your function expects an argument to be an io.TextIOWrapper that this function can write to.

Example Python function-based component

This section demonstrates how to build a Python function-based component that uses imports, helper functions, and produces multiple outputs.

  1. Define your function. This example function uses the numpy package to calculate the quotient and remainder for a given dividend and divisor in a helper function. In addition to the quotient and remainder, the function also returns two metrics.

    By adding the @component annotation, you convert your function into a factory function that creates pipeline steps that execute this function. This example also specifies the base container image to run you component in.

from typing import NamedTuple

@component(base_image='tensorflow/tensorflow:1.11.0-py3')
def my_divmod(
  dividend: float,
  divisor: float,
  metrics: Output[Metrics]) -> NamedTuple(
    'MyDivmodOutput',
    [
      ('quotient', float),
      ('remainder', float),
    ]):
    '''Divides two numbers and calculate  the quotient and remainder'''

    # Import the numpy package inside the component function
    import numpy as np

    # Define a helper function
    def divmod_helper(dividend, divisor):
        return np.divmod(dividend, divisor)

    (quotient, remainder) = divmod_helper(dividend, divisor)

    # Export two metrics
    metrics.log_metric('quotient', float(quotient))
    metrics.log_metric('remainder', float(remainder))

    from collections import namedtuple
    divmod_output = namedtuple('MyDivmodOutput',
        ['quotient', 'remainder'])
    return divmod_output(quotient, remainder)
  1. Define your pipeline. This example pipeline uses the my_divmod factory function and the add factory function from an earlier example.
import kfp.dsl as dsl
@dsl.pipeline(
   name='calculation-pipeline',
   description='An example pipeline that performs arithmetic calculations.',
   pipeline_root='gs://my-pipeline-root/example-pipeline'
)
def calc_pipeline(
   a: float=1,
   b: float=7,
   c: float=17,
):
    # Passes a pipeline parameter and a constant value as operation arguments.
    add_task = add(a, 4) # The add_op factory function returns
                            # a dsl.ContainerOp class instance. 

    # Passes the output of the add_task and a pipeline parameter as operation
    # arguments. For an operation with a single return value, the output
    # reference is accessed using `task.output` or
    # `task.outputs['output_name']`.
    divmod_task = my_divmod(add_task.output, b)

    # For an operation with multiple return values, output references are
    # accessed as `task.outputs['output_name']`.
    result_task = add(divmod_task.outputs['quotient'], c)
  1. Compile and run your pipeline. Learn more about compiling and running pipelines.
# Specify pipeline argument values
arguments = {'a': 7, 'b': 8}

# Submit a pipeline run
client.create_run_from_pipeline_func(
    calc_pipeline,
    arguments=arguments,
    mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE)