From 4bbe41aeae84272b8d4240d8273d6780cb7b2696 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Wed, 8 Jun 2022 01:39:39 -0700 Subject: [PATCH 01/15] feat: Added the PipelineJob.submit_from_pipeline_func method --- google/cloud/aiplatform/pipeline_jobs.py | 148 +++++++++++++++++- google/cloud/aiplatform/utils/gcs_utils.py | 51 +++++- .../utils/resource_manager_utils.py | 26 +++ 3 files changed, 223 insertions(+), 2 deletions(-) diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index f6fcc3a0af..0a8ca4df91 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -19,7 +19,8 @@ import logging import time import re -from typing import Any, Dict, List, Optional, Union +import tempfile +from typing import Any, Callable, Dict, List, Optional, Union from google.auth import credentials as auth_credentials from google.cloud.aiplatform import base @@ -31,6 +32,7 @@ from google.cloud.aiplatform.metadata import constants as metadata_constants from google.cloud.aiplatform.metadata import experiment_resources from google.cloud.aiplatform.metadata import utils as metadata_utils +from google.cloud.aiplatform.utils import gcs_utils from google.cloud.aiplatform.utils import yaml_utils from google.cloud.aiplatform.utils import pipeline_utils from google.protobuf import json_format @@ -776,3 +778,147 @@ def clone( ) return cloned + + @staticmethod + def submit_from_pipeline_func( + # Parameters for the PipelineJob constructor + pipeline_func: Callable, + arguments: Optional[Dict[str, Any]] = None, + output_artifacts_gcs_dir: Optional[str] = None, + enable_caching: Optional[bool] = None, + context_name: Optional[str] = "pipeline", + display_name: Optional[str] = None, + labels: Optional[Dict[str, str]] = None, + job_id: Optional[str] = None, + # Parameters for the PipelineJob.submit method + service_account: Optional[str] = None, + network: Optional[str] = None, + create_request_timeout: Optional[float] = None, + # Parameters for the Vertex SDK + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + encryption_spec_key_name: Optional[str] = None, + ) -> "PipelineJob": + """Creates pipelineJob from a pipeline function and submits it for execution. + + Args: + pipeline_func (Callable): + Required. A pipeline function to compile. + A pipeline function creates instances of components and connects + component inputs to outputs. + arguments (Dict[str, Any]): + Optional. The mapping from runtime parameter names to its values that + control the pipeline run. + output_artifacts_gcs_dir (str): + Optional. The GCS location of the pipeline outputs. + A GCS bucket for artifacts will be created if not specified. + enable_caching (bool): + Optional. Whether to turn on caching for the run. + + If this is not set, defaults to the compile time settings, which + are True for all tasks by default, while users may specify + different caching options for individual tasks. + + If this is set, the setting applies to all tasks in the pipeline. + + Overrides the compile time settings. + context_name (str): + Optional. The name of metadata context. Used for cached execution reuse. + display_name (str): + Optional. The user-defined name of this Pipeline. + labels (Dict[str, str]): + Optional. The user defined metadata to organize PipelineJob. + job_id (str): + Optional. The unique ID of the job run. + If not specified, pipeline name + timestamp will be used. + + service_account (str): + Optional. Specifies the service account for workload run-as account. + Users submitting jobs must have act-as permission on this run-as account. + network (str): + Optional. The full name of the Compute Engine network to which the job + should be peered. For example, projects/12345/global/networks/myVPC. + + Private services access must already be configured for the network. + If left unspecified, the job is not peered with any network. + create_request_timeout (float): + Optional. The timeout for the create request in seconds. + + project (str): + Optional. The project that you want to run this PipelineJob in. If not set, + the project set in aiplatform.init will be used. + location (str): + Optional. Location to create PipelineJob. If not set, + location set in aiplatform.init will be used. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to create this PipelineJob. + Overrides credentials set in aiplatform.init. + encryption_spec_key_name (str): + Optional. The Cloud KMS resource identifier of the customer + managed encryption key used to protect the job. Has the + form: + ``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. + The key needs to be in the same region as where the compute + resource is created. + + If this is set, then all + resources created by the PipelineJob will + be encrypted with the provided encryption key. + + Overrides encryption_spec_key_name set in aiplatform.init. + + Returns: + A Vertex AI PipelineJob. + + Raises: + ValueError: If job_id or labels have incorrect format. + """ + + # Importing the KFP module here to prevent import errors when the kfp package is not installed. + from kfp.v2 import compiler as compiler_v2 + + if not output_artifacts_gcs_dir: + output_artifacts_gcs_dir = ( + gcs_utils.create_gcs_directory_for_pipeline_artifacts( + service_account=service_account, + project=project, + location=location, + credentials=credentials, + ) + ) + + automatic_display_name = ( + pipeline_func.__name__.replace("_", " ") + + " " + + datetime.datetime.now().isoformat(sep=" ") + ) + display_name = display_name or automatic_display_name + job_id = job_id or re.sub( + r"[^-a-z0-9]", "-", automatic_display_name.lower() + ).strip("-") + pipeline_file = tempfile.mktemp(suffix=".json") + compiler_v2.Compiler().compile( + pipeline_func=pipeline_func, + pipeline_name=context_name, + package_path=pipeline_file, + ) + pipeline_job = PipelineJob( + template_path=pipeline_file, + parameter_values=arguments, + pipeline_root=output_artifacts_gcs_dir, + enable_caching=enable_caching, + display_name=display_name, + job_id=job_id, + labels=labels, + project=project, + location=location, + credentials=credentials, + encryption_spec_key_name=encryption_spec_key_name, + ) + pipeline_job.submit( + service_account=service_account, + network=network, + create_request_timeout=create_request_timeout, + ) + return pipeline_job diff --git a/google/cloud/aiplatform/utils/gcs_utils.py b/google/cloud/aiplatform/utils/gcs_utils.py index 855b7991f1..2add436e00 100644 --- a/google/cloud/aiplatform/utils/gcs_utils.py +++ b/google/cloud/aiplatform/utils/gcs_utils.py @@ -25,7 +25,7 @@ from google.cloud import storage from google.cloud.aiplatform import initializer - +from google.cloud.aiplatform.utils import resource_manager_utils _logger = logging.getLogger(__name__) @@ -163,3 +163,52 @@ def stage_local_data_in_gcs( ) return staged_data_uri + + +def create_gcs_directory_for_pipeline_artifacts( + service_account: Optional[str] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, +): + project = project or initializer.global_config.project + location = location or initializer.global_config.location + credentials = credentials or initializer.global_config.credentials + + pipelines_bucket_name = project + "-vertex-pipelines-" + location + output_artifacts_gcs_dir = "gs://" + pipelines_bucket_name + "/output_artifacts/" + # Creating the bucket if needed + storage_client = storage.Client( + project=project, + credentials=credentials, + ) + pipelines_bucket = storage.Bucket( + client=storage_client, + name=pipelines_bucket_name, + ) + if not pipelines_bucket.exists(): + _logger.info( + f'Creating GCS bucket for Vertex Pipelines "{pipelines_bucket_name}"' + ) + pipelines_bucket = storage_client.create_bucket( + bucket_or_name=pipelines_bucket, + project=project, + location=location, + ) + # Giving the service account read and write access to teh new bucket + # Workaround for error: "Failed to create pipeline job. Error: Service account `NNNNNNNN-compute@developer.gserviceaccount.com` + # does not have `[storage.objects.get, storage.objects.create]` IAM permission(s) to the bucket `xxxxxxxx-vertex-pipelines-us-central1`. + # Please either copy the files to the Google Cloud Storage bucket owned by your project, or grant the required IAM permission(s) to the service account." + if not service_account: + # Getting the project number to use in service account + project_number = resource_manager_utils.get_project_number(project) + service_account = f"{project_number}-compute@developer.gserviceaccount.com" + bucket_iam_policy = pipelines_bucket.get_iam_policy() + bucket_iam_policy.setdefault("roles/storage.objectCreator", set()).add( + f"serviceAccount:{service_account}" + ) + bucket_iam_policy.setdefault("roles/storage.objectViewer", set()).add( + f"serviceAccount:{service_account}" + ) + pipelines_bucket.set_iam_policy(bucket_iam_policy) + return output_artifacts_gcs_dir diff --git a/google/cloud/aiplatform/utils/resource_manager_utils.py b/google/cloud/aiplatform/utils/resource_manager_utils.py index f918c766bf..e521cfff14 100644 --- a/google/cloud/aiplatform/utils/resource_manager_utils.py +++ b/google/cloud/aiplatform/utils/resource_manager_utils.py @@ -48,3 +48,29 @@ def get_project_id( project = projects_client.get_project(name=f"projects/{project_number}") return project.project_id + +def get_project_number( + project_id: str, + credentials: Optional[auth_credentials.Credentials] = None, +) -> str: + """Gets project ID given the project number + + Args: + project_id (str): + Required. Google Cloud project unique ID. + credentials: The custom credentials to use when making API calls. + Optional. If not provided, default credentials will be used. + + Returns: + str - The automatically generated unique numerical identifier for your GCP project. + + """ + + credentials = credentials or initializer.global_config.credentials + + projects_client = resourcemanager.ProjectsClient(credentials=credentials) + + project = projects_client.get_project(name=f"projects/{project_id}") + project_number = project.name.split("/", 1)[1] + + return project_number From 35a9375e33d62059a2ba186255c386c862e1289b Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Thu, 23 Jun 2022 01:44:12 -0700 Subject: [PATCH 02/15] Bumped the copyright year --- google/cloud/aiplatform/utils/gcs_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/aiplatform/utils/gcs_utils.py b/google/cloud/aiplatform/utils/gcs_utils.py index 2add436e00..37669583dd 100644 --- a/google/cloud/aiplatform/utils/gcs_utils.py +++ b/google/cloud/aiplatform/utils/gcs_utils.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2021 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From cb2c17501427bc70e4cb0cf63bd52e5f54bd32ac Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Thu, 23 Jun 2022 01:44:35 -0700 Subject: [PATCH 03/15] Added docstring --- google/cloud/aiplatform/utils/gcs_utils.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/google/cloud/aiplatform/utils/gcs_utils.py b/google/cloud/aiplatform/utils/gcs_utils.py index 37669583dd..03ca8bec87 100644 --- a/google/cloud/aiplatform/utils/gcs_utils.py +++ b/google/cloud/aiplatform/utils/gcs_utils.py @@ -171,6 +171,21 @@ def create_gcs_directory_for_pipeline_artifacts( location: Optional[str] = None, credentials: Optional[auth_credentials.Credentials] = None, ): + """Gets or creates the GCS directory for Vertex Pipelines artifacts. + + Args: + service_account: Optional. Google Cloud service account that will be used + to run the pipelines. If this function creates a new bucket it will give + permission to the specified service account to access the bucket. + If not provided, the Google Cloud Compute Engine service account will be used. + project: Optional. Google Cloud Project that contains the staging bucket. + location: Optional. Google Cloud location to use for the staging bucket. + credentials: The custom credentials to use when making API calls. + If not provided, default credentials will be used. + + Returns: + Google Cloud Storage URI of the staged data. + """ project = project or initializer.global_config.project location = location or initializer.global_config.location credentials = credentials or initializer.global_config.credentials From 279d7f2d381a0039f7bb192bd77b41513229fd1c Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Thu, 23 Jun 2022 01:45:28 -0700 Subject: [PATCH 04/15] Raising more informative exceptions when the kfp compiler module cannot be loaded --- google/cloud/aiplatform/pipeline_jobs.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index 0a8ca4df91..a384f1878c 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -876,7 +876,12 @@ def submit_from_pipeline_func( """ # Importing the KFP module here to prevent import errors when the kfp package is not installed. - from kfp.v2 import compiler as compiler_v2 + try: + from kfp.v2 import compiler as compiler_v2 + except ImportError as err: + raise RuntimeError( + "Cannot import the kfp.v2.compiler module. Please install or update the kfp package." + ) from err if not output_artifacts_gcs_dir: output_artifacts_gcs_dir = ( From 9040503a9f201b27c083943d25989cd1cbcc8b5d Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Thu, 23 Jun 2022 01:46:54 -0700 Subject: [PATCH 05/15] Split the submit_from_pipeline_func into from_pipeline_func and submit --- google/cloud/aiplatform/pipeline_jobs.py | 21 +++------------------ 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index a384f1878c..27691a1f6d 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -780,7 +780,7 @@ def clone( return cloned @staticmethod - def submit_from_pipeline_func( + def from_pipeline_func( # Parameters for the PipelineJob constructor pipeline_func: Callable, arguments: Optional[Dict[str, Any]] = None, @@ -790,17 +790,15 @@ def submit_from_pipeline_func( display_name: Optional[str] = None, labels: Optional[Dict[str, str]] = None, job_id: Optional[str] = None, - # Parameters for the PipelineJob.submit method + # Parameters for the bucket creation service_account: Optional[str] = None, - network: Optional[str] = None, - create_request_timeout: Optional[float] = None, # Parameters for the Vertex SDK project: Optional[str] = None, location: Optional[str] = None, credentials: Optional[auth_credentials.Credentials] = None, encryption_spec_key_name: Optional[str] = None, ) -> "PipelineJob": - """Creates pipelineJob from a pipeline function and submits it for execution. + """Creates PipelineJob by compiling a pipeline function. Args: pipeline_func (Callable): @@ -836,14 +834,6 @@ def submit_from_pipeline_func( service_account (str): Optional. Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. - network (str): - Optional. The full name of the Compute Engine network to which the job - should be peered. For example, projects/12345/global/networks/myVPC. - - Private services access must already be configured for the network. - If left unspecified, the job is not peered with any network. - create_request_timeout (float): - Optional. The timeout for the create request in seconds. project (str): Optional. The project that you want to run this PipelineJob in. If not set, @@ -921,9 +911,4 @@ def submit_from_pipeline_func( credentials=credentials, encryption_spec_key_name=encryption_spec_key_name, ) - pipeline_job.submit( - service_account=service_account, - network=network, - create_request_timeout=create_request_timeout, - ) return pipeline_job From 0c251311ea1cd7b1d211e5c2157ce6e17f707542 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Tue, 28 Jun 2022 02:22:10 -0700 Subject: [PATCH 06/15] Fixed the formatting --- google/cloud/aiplatform/utils/resource_manager_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/google/cloud/aiplatform/utils/resource_manager_utils.py b/google/cloud/aiplatform/utils/resource_manager_utils.py index e521cfff14..e6cbc0988b 100644 --- a/google/cloud/aiplatform/utils/resource_manager_utils.py +++ b/google/cloud/aiplatform/utils/resource_manager_utils.py @@ -49,6 +49,7 @@ def get_project_id( return project.project_id + def get_project_number( project_id: str, credentials: Optional[auth_credentials.Credentials] = None, From 5498f42397c914c3e5c8aa9566d0ac9e2a876476 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Tue, 28 Jun 2022 02:30:06 -0700 Subject: [PATCH 07/15] Refactored the change to only create the artifact bucket when submit is called It's advised to not perform any complex operations (especially making external changes) as part of constructors of factory methods. --- google/cloud/aiplatform/pipeline_jobs.py | 30 ++++++------- google/cloud/aiplatform/utils/gcs_utils.py | 50 +++++++++++++++++++--- 2 files changed, 57 insertions(+), 23 deletions(-) diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index 27691a1f6d..b93c46ffdc 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -226,6 +226,13 @@ def __init__( or pipeline_job["pipelineSpec"].get("defaultPipelineRoot") or initializer.global_config.staging_bucket ) + pipeline_root = ( + pipeline_root + or gcs_utils.generate_gcs_directory_for_pipeline_artifacts( + project=project, + location=location, + ) + ) builder = pipeline_utils.PipelineRuntimeConfigBuilder.from_job_spec_json( pipeline_job ) @@ -339,6 +346,13 @@ def submit( if network: self._gca_resource.network = network + gcs_utils.create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist( + output_artifacts_gcs_dir=self.pipeline_spec.get("gcsOutputDirectory"), + service_account=self._gca_resource.service_account, + project=self.project, + location=self.location, + ) + # Prevents logs from being supressed on TFX pipelines if self._gca_resource.pipeline_spec.get("sdkVersion", "").startswith("tfx"): _LOGGER.setLevel(logging.INFO) @@ -790,8 +804,6 @@ def from_pipeline_func( display_name: Optional[str] = None, labels: Optional[Dict[str, str]] = None, job_id: Optional[str] = None, - # Parameters for the bucket creation - service_account: Optional[str] = None, # Parameters for the Vertex SDK project: Optional[str] = None, location: Optional[str] = None, @@ -831,10 +843,6 @@ def from_pipeline_func( Optional. The unique ID of the job run. If not specified, pipeline name + timestamp will be used. - service_account (str): - Optional. Specifies the service account for workload run-as account. - Users submitting jobs must have act-as permission on this run-as account. - project (str): Optional. The project that you want to run this PipelineJob in. If not set, the project set in aiplatform.init will be used. @@ -873,16 +881,6 @@ def from_pipeline_func( "Cannot import the kfp.v2.compiler module. Please install or update the kfp package." ) from err - if not output_artifacts_gcs_dir: - output_artifacts_gcs_dir = ( - gcs_utils.create_gcs_directory_for_pipeline_artifacts( - service_account=service_account, - project=project, - location=location, - credentials=credentials, - ) - ) - automatic_display_name = ( pipeline_func.__name__.replace("_", " ") + " " diff --git a/google/cloud/aiplatform/utils/gcs_utils.py b/google/cloud/aiplatform/utils/gcs_utils.py index 03ca8bec87..15aa9970bb 100644 --- a/google/cloud/aiplatform/utils/gcs_utils.py +++ b/google/cloud/aiplatform/utils/gcs_utils.py @@ -165,7 +165,33 @@ def stage_local_data_in_gcs( return staged_data_uri -def create_gcs_directory_for_pipeline_artifacts( +def generate_gcs_directory_for_pipeline_artifacts( + project: Optional[str] = None, + location: Optional[str] = None, +): + """Gets or creates the GCS directory for Vertex Pipelines artifacts. + + Args: + service_account: Optional. Google Cloud service account that will be used + to run the pipelines. If this function creates a new bucket it will give + permission to the specified service account to access the bucket. + If not provided, the Google Cloud Compute Engine service account will be used. + project: Optional. Google Cloud Project that contains the staging bucket. + location: Optional. Google Cloud location to use for the staging bucket. + + Returns: + Google Cloud Storage URI of the staged data. + """ + project = project or initializer.global_config.project + location = location or initializer.global_config.location + + pipelines_bucket_name = project + "-vertex-pipelines-" + location + output_artifacts_gcs_dir = "gs://" + pipelines_bucket_name + "/output_artifacts/" + return output_artifacts_gcs_dir + + +def create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist( + output_artifacts_gcs_dir: Optional[str] = None, service_account: Optional[str] = None, project: Optional[str] = None, location: Optional[str] = None, @@ -174,6 +200,8 @@ def create_gcs_directory_for_pipeline_artifacts( """Gets or creates the GCS directory for Vertex Pipelines artifacts. Args: + output_artifacts_gcs_dir: Optional. The GCS location for the pipeline outputs. + It will be generated if not specified. service_account: Optional. Google Cloud service account that will be used to run the pipelines. If this function creates a new bucket it will give permission to the specified service account to access the bucket. @@ -190,20 +218,28 @@ def create_gcs_directory_for_pipeline_artifacts( location = location or initializer.global_config.location credentials = credentials or initializer.global_config.credentials - pipelines_bucket_name = project + "-vertex-pipelines-" + location - output_artifacts_gcs_dir = "gs://" + pipelines_bucket_name + "/output_artifacts/" + output_artifacts_gcs_dir = ( + output_artifacts_gcs_dir + or generate_gcs_directory_for_pipeline_artifacts( + project=project, + location=location, + ) + ) + # Creating the bucket if needed storage_client = storage.Client( project=project, credentials=credentials, ) - pipelines_bucket = storage.Bucket( + + pipelines_bucket = storage.Blob.from_string( + uri=output_artifacts_gcs_dir, client=storage_client, - name=pipelines_bucket_name, - ) + ).bucket + if not pipelines_bucket.exists(): _logger.info( - f'Creating GCS bucket for Vertex Pipelines "{pipelines_bucket_name}"' + f'Creating GCS bucket for Vertex Pipelines: "{pipelines_bucket.name}"' ) pipelines_bucket = storage_client.create_bucket( bucket_or_name=pipelines_bucket, From b3a99c6d424748af1a4ac288ad9eb88168768d08 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Fri, 1 Jul 2022 19:09:42 -0700 Subject: [PATCH 08/15] Renamed the arguments parameter to parameter_values for consistency with PipelineJob Unfortunately, the parameter_values parameter name itself was chosen inconsistently with the KFP SDK and ComponentSpec that both use the "arguments" naming. --- google/cloud/aiplatform/pipeline_jobs.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index b93c46ffdc..2ad5c5a8bc 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -797,7 +797,7 @@ def clone( def from_pipeline_func( # Parameters for the PipelineJob constructor pipeline_func: Callable, - arguments: Optional[Dict[str, Any]] = None, + parameter_values: Optional[Dict[str, Any]] = None, output_artifacts_gcs_dir: Optional[str] = None, enable_caching: Optional[bool] = None, context_name: Optional[str] = "pipeline", @@ -817,7 +817,7 @@ def from_pipeline_func( Required. A pipeline function to compile. A pipeline function creates instances of components and connects component inputs to outputs. - arguments (Dict[str, Any]): + parameter_values (Dict[str, Any]): Optional. The mapping from runtime parameter names to its values that control the pipeline run. output_artifacts_gcs_dir (str): @@ -898,7 +898,7 @@ def from_pipeline_func( ) pipeline_job = PipelineJob( template_path=pipeline_file, - parameter_values=arguments, + parameter_values=parameter_values, pipeline_root=output_artifacts_gcs_dir, enable_caching=enable_caching, display_name=display_name, From c0b95e794d9a81f38136151820a2674661425ffb Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Fri, 1 Jul 2022 19:26:39 -0700 Subject: [PATCH 09/15] Using the PipelineJob.from_pipeline_func method in tests --- tests/system/aiplatform/test_experiments.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tests/system/aiplatform/test_experiments.py b/tests/system/aiplatform/test_experiments.py index c65cb0c723..cd8a8ee205 100644 --- a/tests/system/aiplatform/test_experiments.py +++ b/tests/system/aiplatform/test_experiments.py @@ -262,7 +262,6 @@ def test_run_context_manager(self): def test_add_pipeline_job_to_experiment(self, shared_state): import kfp.v2.dsl as dsl - import kfp.v2.compiler as compiler from kfp.v2.dsl import component, Metrics, Output @component @@ -276,13 +275,9 @@ def trainer( def pipeline(learning_rate: float, dropout_rate: float): trainer(learning_rate=learning_rate, dropout_rate=dropout_rate) - compiler.Compiler().compile( - pipeline_func=pipeline, package_path="pipeline.json" - ) - - job = aiplatform.PipelineJob( + job = aiplatform.PipelineJob.from_pipeline_func( + pipeline_func=pipeline, display_name=self._make_display_name("experiment pipeline job"), - template_path="pipeline.json", job_id=self._pipeline_job_id, pipeline_root=f'gs://{shared_state["staging_bucket_name"]}', parameter_values={"learning_rate": 0.1, "dropout_rate": 0.2}, From 9c1e1ac68bdf7e091d7a32d0caf1fb9a44064ba3 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Fri, 1 Jul 2022 19:27:30 -0700 Subject: [PATCH 10/15] Added a PipelineJob.from_pipeline_func integration test --- tests/system/aiplatform/test_pipeline_job.py | 59 ++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 tests/system/aiplatform/test_pipeline_job.py diff --git a/tests/system/aiplatform/test_pipeline_job.py b/tests/system/aiplatform/test_pipeline_job.py new file mode 100644 index 0000000000..90fdee8e2a --- /dev/null +++ b/tests/system/aiplatform/test_pipeline_job.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pytest + +from google.cloud import aiplatform +from tests.system.aiplatform import e2e_base + + +@pytest.mark.usefixtures("tear_down_resources") +class TestExperiments(e2e_base.TestEndToEnd): + + _temp_prefix = "tmpvrtxsdk-e2e" + + def test_add_pipeline_job_to_experiment(self, shared_state): + from kfp.v2 import components + + # Components: + def train( + number_of_epochs: int, + learning_rate: float, + ): + print(f"number_of_epochs={number_of_epochs}") + print(f"learning_rate={learning_rate}") + + train_op = components.create_component_from_func(train) + + # Pipeline: + def training_pipeline(number_of_epochs: int): + train_op( + number_of_epochs=number_of_epochs, + learning_rate="0.1", + ) + + # Submitting the pipeline: + aiplatform.init( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + ) + job = aiplatform.PipelineJob.from_pipeline_func(pipeline_func=training_pipeline) + job.submit() + + shared_state["resources"].append(job) + + job.wait() From 2f4b70e6458b05795a1cd16620577708924ba555 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Fri, 1 Jul 2022 22:40:22 -0700 Subject: [PATCH 11/15] Added artifact bucket mock to the unit tests --- tests/unit/aiplatform/test_pipeline_jobs.py | 41 +++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index 5f6b24fd09..54d5131400 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -31,6 +31,7 @@ from google.cloud.aiplatform import initializer from google.cloud.aiplatform import pipeline_jobs from google.cloud.aiplatform.compat.types import pipeline_failure_policy +from google.cloud.aiplatform.utils import gcs_utils from google.cloud import storage from google.protobuf import json_format @@ -204,6 +205,31 @@ def mock_pipeline_service_create(): yield mock_create_pipeline_job +@pytest.fixture +def mock_pipeline_bucket_exists(): + def mock_create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist( + output_artifacts_gcs_dir=None, + service_account=None, + project=None, + location=None, + credentials=None, + ): + output_artifacts_gcs_dir = ( + output_artifacts_gcs_dir + or gcs_utils.generate_gcs_directory_for_pipeline_artifacts( + project=project, + location=location, + ) + ) + return output_artifacts_gcs_dir + + with mock.patch( + "google.cloud.aiplatform.utils.gcs_utils.create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist", + new=mock_create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist, + ) as mock_context: + yield mock_context + + def make_pipeline_job(state): return gca_pipeline_job.PipelineJob( name=_TEST_PIPELINE_JOB_NAME, @@ -322,6 +348,7 @@ def test_run_call_pipeline_service_create( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, sync, @@ -399,6 +426,7 @@ def test_run_call_pipeline_service_create_artifact_registry( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, mock_request_urlopen, job_spec, mock_load_yaml_and_json, @@ -482,6 +510,7 @@ def test_run_call_pipeline_service_create_with_timeout( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, sync, @@ -563,6 +592,7 @@ def test_run_call_pipeline_service_create_with_timeout_not_explicitly_set( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, sync, @@ -644,6 +674,7 @@ def test_run_call_pipeline_service_create_with_failure_policy( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, failure_policy, @@ -728,6 +759,7 @@ def test_run_call_pipeline_service_create_legacy( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, sync, @@ -809,6 +841,7 @@ def test_run_call_pipeline_service_create_tfx( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, sync, @@ -886,6 +919,7 @@ def test_submit_call_pipeline_service_pipeline_job_create( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -961,6 +995,7 @@ def test_done_method_pipeline_service( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -999,6 +1034,7 @@ def test_submit_call_pipeline_service_pipeline_job_create_legacy( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -1079,6 +1115,7 @@ def test_get_pipeline_job(self, mock_pipeline_service_get): @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", + "mock_pipeline_bucket_exists", ) @pytest.mark.parametrize( "job_spec", @@ -1116,6 +1153,7 @@ def test_cancel_pipeline_job( @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", + "mock_pipeline_bucket_exists", ) @pytest.mark.parametrize( "job_spec", @@ -1190,6 +1228,7 @@ def test_cancel_pipeline_job_without_running( @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get_with_fail", + "mock_pipeline_bucket_exists", ) @pytest.mark.parametrize( "job_spec", @@ -1230,6 +1269,7 @@ def test_clone_pipeline_job( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -1307,6 +1347,7 @@ def test_clone_pipeline_job_with_all_args( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): From 85909e8b301d8470cac8a129a8381c5faac94015 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Sat, 2 Jul 2022 01:38:07 -0700 Subject: [PATCH 12/15] Fixed the PipelineJob.from_pipeline_func integration test --- tests/system/aiplatform/test_pipeline_job.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/system/aiplatform/test_pipeline_job.py b/tests/system/aiplatform/test_pipeline_job.py index 90fdee8e2a..62e0306f47 100644 --- a/tests/system/aiplatform/test_pipeline_job.py +++ b/tests/system/aiplatform/test_pipeline_job.py @@ -27,7 +27,7 @@ class TestExperiments(e2e_base.TestEndToEnd): _temp_prefix = "tmpvrtxsdk-e2e" def test_add_pipeline_job_to_experiment(self, shared_state): - from kfp.v2 import components + from kfp import components # Components: def train( @@ -40,7 +40,7 @@ def train( train_op = components.create_component_from_func(train) # Pipeline: - def training_pipeline(number_of_epochs: int): + def training_pipeline(number_of_epochs: int = 10): train_op( number_of_epochs=number_of_epochs, learning_rate="0.1", @@ -51,9 +51,11 @@ def training_pipeline(number_of_epochs: int): project=e2e_base._PROJECT, location=e2e_base._LOCATION, ) - job = aiplatform.PipelineJob.from_pipeline_func(pipeline_func=training_pipeline) + job = aiplatform.PipelineJob.from_pipeline_func( + pipeline_func=training_pipeline, + ) job.submit() - shared_state["resources"].append(job) + shared_state.setdefault("resources", []).append(job) job.wait() From 5cbcd8ba8f7e99217c4e927e5da5c26681731d94 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Tue, 12 Jul 2022 11:39:23 -0700 Subject: [PATCH 13/15] Implemented the review feedback --- google/cloud/aiplatform/pipeline_jobs.py | 13 ++++++++----- google/cloud/aiplatform/utils/gcs_utils.py | 2 +- tests/system/aiplatform/test_pipeline_job.py | 2 +- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index 2ad5c5a8bc..cadd9965d3 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -138,7 +138,9 @@ def __init__( Optional. The unique ID of the job run. If not specified, pipeline name + timestamp will be used. pipeline_root (str): - Optional. The root of the pipeline outputs. Default to be staging bucket. + Optional. The root of the pipeline outputs. If not set, the staging bucket + set in aiplatform.init will be used. If that's not set a pipeline-specific + artifacts bucket will be used. parameter_values (Dict[str, Any]): Optional. The mapping from runtime parameter names to its values that control the pipeline run. @@ -881,10 +883,11 @@ def from_pipeline_func( "Cannot import the kfp.v2.compiler module. Please install or update the kfp package." ) from err - automatic_display_name = ( - pipeline_func.__name__.replace("_", " ") - + " " - + datetime.datetime.now().isoformat(sep=" ") + automatic_display_name = " ".join( + [ + pipeline_func.__name__.replace("_", " "), + datetime.datetime.now().isoformat(sep=" "), + ] ) display_name = display_name or automatic_display_name job_id = job_id or re.sub( diff --git a/google/cloud/aiplatform/utils/gcs_utils.py b/google/cloud/aiplatform/utils/gcs_utils.py index 15aa9970bb..6079d7908d 100644 --- a/google/cloud/aiplatform/utils/gcs_utils.py +++ b/google/cloud/aiplatform/utils/gcs_utils.py @@ -246,7 +246,7 @@ def create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist( project=project, location=location, ) - # Giving the service account read and write access to teh new bucket + # Giving the service account read and write access to the new bucket # Workaround for error: "Failed to create pipeline job. Error: Service account `NNNNNNNN-compute@developer.gserviceaccount.com` # does not have `[storage.objects.get, storage.objects.create]` IAM permission(s) to the bucket `xxxxxxxx-vertex-pipelines-us-central1`. # Please either copy the files to the Google Cloud Storage bucket owned by your project, or grant the required IAM permission(s) to the service account." diff --git a/tests/system/aiplatform/test_pipeline_job.py b/tests/system/aiplatform/test_pipeline_job.py index 62e0306f47..004ad768a3 100644 --- a/tests/system/aiplatform/test_pipeline_job.py +++ b/tests/system/aiplatform/test_pipeline_job.py @@ -22,7 +22,7 @@ @pytest.mark.usefixtures("tear_down_resources") -class TestExperiments(e2e_base.TestEndToEnd): +class TestPipelineJob(e2e_base.TestEndToEnd): _temp_prefix = "tmpvrtxsdk-e2e" From 56247e8b62c99e3e5fc831819140738a55406896 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Tue, 12 Jul 2022 11:41:13 -0700 Subject: [PATCH 14/15] Reverting the test_experiments.py changes --- tests/system/aiplatform/test_experiments.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/system/aiplatform/test_experiments.py b/tests/system/aiplatform/test_experiments.py index cd8a8ee205..c65cb0c723 100644 --- a/tests/system/aiplatform/test_experiments.py +++ b/tests/system/aiplatform/test_experiments.py @@ -262,6 +262,7 @@ def test_run_context_manager(self): def test_add_pipeline_job_to_experiment(self, shared_state): import kfp.v2.dsl as dsl + import kfp.v2.compiler as compiler from kfp.v2.dsl import component, Metrics, Output @component @@ -275,9 +276,13 @@ def trainer( def pipeline(learning_rate: float, dropout_rate: float): trainer(learning_rate=learning_rate, dropout_rate=dropout_rate) - job = aiplatform.PipelineJob.from_pipeline_func( - pipeline_func=pipeline, + compiler.Compiler().compile( + pipeline_func=pipeline, package_path="pipeline.json" + ) + + job = aiplatform.PipelineJob( display_name=self._make_display_name("experiment pipeline job"), + template_path="pipeline.json", job_id=self._pipeline_job_id, pipeline_root=f'gs://{shared_state["staging_bucket_name"]}', parameter_values={"learning_rate": 0.1, "dropout_rate": 0.2}, From bb04e46dd9a4bab1be9484a8327f26f6f4c75bd8 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Tue, 12 Jul 2022 14:22:36 -0700 Subject: [PATCH 15/15] Fixed the tests added after this PR was created --- tests/unit/aiplatform/test_pipeline_jobs.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index 5cceab9e04..63c09ac799 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -1538,6 +1538,7 @@ def test_get_associated_experiment_from_pipeline_returns_none_without_experiment self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -1582,6 +1583,7 @@ def test_get_associated_experiment_from_pipeline_returns_experiment( get_metadata_store_mock, mock_create_pipeline_job_with_experiment, mock_get_pipeline_job_with_experiment, + mock_pipeline_bucket_exists, ): aiplatform.init( project=_TEST_PROJECT,