From 9c1eb6aacca7a82777e33a3921f92a9c456a1927 Mon Sep 17 00:00:00 2001 From: Michael Hu Date: Sat, 21 May 2022 15:33:11 -0400 Subject: [PATCH 1/8] feat: Add Vertex Forecasting E2E test. --- .../datasets/time_series_dataset.py | 4 + tests/system/aiplatform/test_dataset.py | 25 ++++ .../system/aiplatform/test_e2e_forecasting.py | 126 ++++++++++++++++++ 3 files changed, 155 insertions(+) create mode 100644 tests/system/aiplatform/test_e2e_forecasting.py diff --git a/google/cloud/aiplatform/datasets/time_series_dataset.py b/google/cloud/aiplatform/datasets/time_series_dataset.py index ec5546f12a..6bde6be7a5 100644 --- a/google/cloud/aiplatform/datasets/time_series_dataset.py +++ b/google/cloud/aiplatform/datasets/time_series_dataset.py @@ -46,6 +46,7 @@ def create( labels: Optional[Dict[str, str]] = None, encryption_spec_key_name: Optional[str] = None, sync: bool = True, + create_request_timeout: Optional[float] = None, ) -> "TimeSeriesDataset": """Creates a new time series dataset. @@ -102,6 +103,8 @@ def create( Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will be immediately returned and synced when the Future has completed. + create_request_timeout (float): + Optional. The timeout for the create request in seconds. Returns: time_series_dataset (TimeSeriesDataset): @@ -141,6 +144,7 @@ def create( encryption_spec_key_name=encryption_spec_key_name ), sync=sync, + create_request_timeout=create_request_timeout, ) def import_data(self): diff --git a/tests/system/aiplatform/test_dataset.py b/tests/system/aiplatform/test_dataset.py index 54e2528e1f..8003ec411e 100644 --- a/tests/system/aiplatform/test_dataset.py +++ b/tests/system/aiplatform/test_dataset.py @@ -306,6 +306,31 @@ def test_create_tabular_dataset_from_dataframe_with_provided_schema( finally: tabular_dataset.delete() + def test_create_time_series_dataset(self): + """Use the Dataset.create() method to create a new time series dataset. + Then confirm the dataset was successfully created and references GCS source.""" + + try: + time_series_dataset = aiplatform.TimeSeriesDataset.create( + display_name=self._make_display_name(key="create_time_series_dataset"), + gcs_source=[_TEST_TABULAR_CLASSIFICATION_GCS_SOURCE], + create_request_timeout=None, + ) + + gapic_metadata = time_series_dataset.to_dict()["metadata"] + gcs_source_uris = gapic_metadata["inputConfig"]["gcsSource"]["uri"] + + assert len(gcs_source_uris) == 1 + assert _TEST_TABULAR_CLASSIFICATION_GCS_SOURCE == gcs_source_uris[0] + assert ( + time_series_dataset.metadata_schema_uri + == aiplatform.schema.dataset.metadata.time_series + ) + + finally: + if time_series_dataset is not None: + time_series_dataset.delete() + def test_export_data(self, storage_client, staging_bucket): """Get an existing dataset, export data to a newly created folder in Google Cloud Storage, then verify data was successfully exported.""" diff --git a/tests/system/aiplatform/test_e2e_forecasting.py b/tests/system/aiplatform/test_e2e_forecasting.py new file mode 100644 index 0000000000..4dd77926e4 --- /dev/null +++ b/tests/system/aiplatform/test_e2e_forecasting.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +from urllib import request + +import pytest + +from google.cloud import aiplatform +from google.cloud.aiplatform.compat.types import ( + job_state as gca_job_state, + pipeline_state as gca_pipeline_state, +) +from tests.system.aiplatform import e2e_base + +_TRAINING_DATASET_BQ_PATH = ( + "bq://bigquery-public-data:iowa_liquor_sales_forecasting.2020_sales_train" +) +_PREDICTION_DATASET_BQ_PATH = ( + "bq://bigquery-public-data:iowa_liquor_sales_forecasting.2021_sales_predict" +) + + +@pytest.mark.usefixtures("prepare_staging_bucket", "delete_staging_bucket") +class TestEndToEndForecasting(e2e_base.TestEndToEnd): + """End to end system test of the Vertex SDK with forecasting data.""" + + _temp_prefix = "temp-vertex-sdk-e2e-forecasting" + + def test_end_to_end_forecasting(self, shared_state): + """Builds a dataset, trains models, and gets batch predictions.""" + + # Collection of resources generated by this test, to be deleted during + # teardown. + shared_state["resources"] = [] + + aiplatform.init( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + staging_bucket=shared_state["staging_bucket_name"], + ) + + # Create and import to single managed dataset for both training jobs. + ds = aiplatform.TimeSeriesDataset.create( + display_name=self._make_display_name("dataset"), + bq_source=[_TRAINING_DATASET_BQ_PATH], + sync=False, + create_request_timeout=180.0, + ) + shared_state["resources"].extend([ds]) + + time_column = "date" + time_series_identifier_column = "store_name" + target_column = "sale_dollars" + column_specs = { + time_column: "timestamp", + target_column: "numeric", + "city": "categorical", + "zip_code": "categorical", + "county": "categorical", + } + + # Define both training jobs + # TODO(humichael): Add seq2seq job. + automl_job = aiplatform.AutoMLForecastingTrainingJob( + display_name=self._make_display_name("train-housing-automl"), + optimization_objective="minimize-rmse", + column_specs=column_specs, + ) + + # Kick off both training jobs, AutoML job will take approx one hour to + # run. + automl_model = automl_job.run( + dataset=ds, + target_column=target_column, + time_column=time_column, + time_series_identifier_column=time_series_identifier_column, + available_at_forecast_columns=[time_column], + unavailable_at_forecast_columns=[target_column], + time_series_attribute_columns=["city", "zip_code", "county"], + forecast_horizon=30, + context_window=30, + data_granularity_unit="day", + data_granularity_count=1, + budget_milli_node_hours=1000, + model_display_name=self._make_display_name("automl-liquor-model"), + sync=False, + ) + shared_state["resources"].extend([automl_job, automl_model]) + + automl_batch_prediction_job = automl_model.batch_predict( + job_display_name=self._make_display_name("automl-liquor-model"), + instances_format="bigquery", + machine_type="n1-standard-4", + bigquery_source=_PREDICTION_DATASET_BQ_PATH, + gcs_destination_prefix=( + f'gs://{shared_state["staging_bucket_name"]}/bp_results/' + ), + sync=False, + ) + shared_state["resources"].append(automl_batch_prediction_job) + + automl_batch_prediction_job.wait() + + assert ( + automl_job.state + == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + assert ( + automl_batch_prediction_job.state + == gca_job_state.JobState.JOB_STATE_SUCCEEDED + ) From 195f73219d118a8344ab02892faf61630b64b82e Mon Sep 17 00:00:00 2001 From: Michael Hu Date: Sat, 21 May 2022 15:47:30 -0400 Subject: [PATCH 2/8] update time series dataset test data source --- tests/system/aiplatform/test_dataset.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/system/aiplatform/test_dataset.py b/tests/system/aiplatform/test_dataset.py index 8003ec411e..978b444ead 100644 --- a/tests/system/aiplatform/test_dataset.py +++ b/tests/system/aiplatform/test_dataset.py @@ -51,6 +51,7 @@ ) _TEST_DATASET_DISPLAY_NAME = "permanent_50_flowers_dataset" _TEST_TABULAR_CLASSIFICATION_GCS_SOURCE = "gs://ucaip-sample-resources/iris_1000.csv" +_TEST_FORECASTING_BQ_SOURCE = "bq://bigquery-public-data:iowa_liquor_sales_forecasting.2020_sales_train" _TEST_TEXT_ENTITY_EXTRACTION_GCS_SOURCE = f"gs://{TEST_BUCKET}/ai-platform-unified/sdk/datasets/text_entity_extraction_dataset.jsonl" _TEST_IMAGE_OBJECT_DETECTION_GCS_SOURCE = ( "gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl" @@ -313,15 +314,14 @@ def test_create_time_series_dataset(self): try: time_series_dataset = aiplatform.TimeSeriesDataset.create( display_name=self._make_display_name(key="create_time_series_dataset"), - gcs_source=[_TEST_TABULAR_CLASSIFICATION_GCS_SOURCE], + bq_source=[_TEST_FORECASTING_BQ_SOURCE], create_request_timeout=None, ) gapic_metadata = time_series_dataset.to_dict()["metadata"] - gcs_source_uris = gapic_metadata["inputConfig"]["gcsSource"]["uri"] + bq_source_uri = gapic_metadata["inputConfig"]["bigquerySource"]["uri"] - assert len(gcs_source_uris) == 1 - assert _TEST_TABULAR_CLASSIFICATION_GCS_SOURCE == gcs_source_uris[0] + assert _TEST_FORECASTING_BQ_SOURCE == bq_source_uri assert ( time_series_dataset.metadata_schema_uri == aiplatform.schema.dataset.metadata.time_series From e6c28da3046d4ca26cb11c4067c3ad1f75a00a89 Mon Sep 17 00:00:00 2001 From: Michael Hu Date: Mon, 23 May 2022 10:44:09 -0400 Subject: [PATCH 3/8] fix linting errors --- tests/system/aiplatform/test_dataset.py | 4 +++- .../system/aiplatform/test_e2e_forecasting.py | 20 +++++-------------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/tests/system/aiplatform/test_dataset.py b/tests/system/aiplatform/test_dataset.py index 978b444ead..a2d487db44 100644 --- a/tests/system/aiplatform/test_dataset.py +++ b/tests/system/aiplatform/test_dataset.py @@ -51,7 +51,9 @@ ) _TEST_DATASET_DISPLAY_NAME = "permanent_50_flowers_dataset" _TEST_TABULAR_CLASSIFICATION_GCS_SOURCE = "gs://ucaip-sample-resources/iris_1000.csv" -_TEST_FORECASTING_BQ_SOURCE = "bq://bigquery-public-data:iowa_liquor_sales_forecasting.2020_sales_train" +_TEST_FORECASTING_BQ_SOURCE = ( + "bq://bigquery-public-data:iowa_liquor_sales_forecasting.2020_sales_train" +) _TEST_TEXT_ENTITY_EXTRACTION_GCS_SOURCE = f"gs://{TEST_BUCKET}/ai-platform-unified/sdk/datasets/text_entity_extraction_dataset.jsonl" _TEST_IMAGE_OBJECT_DETECTION_GCS_SOURCE = ( "gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl" diff --git a/tests/system/aiplatform/test_e2e_forecasting.py b/tests/system/aiplatform/test_e2e_forecasting.py index 4dd77926e4..21b7249b7e 100644 --- a/tests/system/aiplatform/test_e2e_forecasting.py +++ b/tests/system/aiplatform/test_e2e_forecasting.py @@ -15,16 +15,10 @@ # limitations under the License. # -import os -from urllib import request - -import pytest - from google.cloud import aiplatform -from google.cloud.aiplatform.compat.types import ( - job_state as gca_job_state, - pipeline_state as gca_pipeline_state, -) +from google.cloud.aiplatform.compat.types import job_state +from google.cloud.aiplatform.compat.types import pipeline_state +import pytest from tests.system.aiplatform import e2e_base _TRAINING_DATASET_BQ_PATH = ( @@ -116,11 +110,7 @@ def test_end_to_end_forecasting(self, shared_state): automl_batch_prediction_job.wait() + assert automl_job.state == pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED assert ( - automl_job.state - == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED - ) - assert ( - automl_batch_prediction_job.state - == gca_job_state.JobState.JOB_STATE_SUCCEEDED + automl_batch_prediction_job.state == job_state.JobState.JOB_STATE_SUCCEEDED ) From e9c5b410ec629c34848bd0725b52d2aec3471d03 Mon Sep 17 00:00:00 2001 From: Michael Hu Date: Tue, 24 May 2022 15:34:08 -0400 Subject: [PATCH 4/8] clean up resources --- .../system/aiplatform/test_e2e_forecasting.py | 150 +++++++++--------- 1 file changed, 79 insertions(+), 71 deletions(-) diff --git a/tests/system/aiplatform/test_e2e_forecasting.py b/tests/system/aiplatform/test_e2e_forecasting.py index 21b7249b7e..cb5fd2a169 100644 --- a/tests/system/aiplatform/test_e2e_forecasting.py +++ b/tests/system/aiplatform/test_e2e_forecasting.py @@ -38,79 +38,87 @@ class TestEndToEndForecasting(e2e_base.TestEndToEnd): def test_end_to_end_forecasting(self, shared_state): """Builds a dataset, trains models, and gets batch predictions.""" - # Collection of resources generated by this test, to be deleted during - # teardown. - shared_state["resources"] = [] - aiplatform.init( project=e2e_base._PROJECT, location=e2e_base._LOCATION, staging_bucket=shared_state["staging_bucket_name"], ) - - # Create and import to single managed dataset for both training jobs. - ds = aiplatform.TimeSeriesDataset.create( - display_name=self._make_display_name("dataset"), - bq_source=[_TRAINING_DATASET_BQ_PATH], - sync=False, - create_request_timeout=180.0, - ) - shared_state["resources"].extend([ds]) - - time_column = "date" - time_series_identifier_column = "store_name" - target_column = "sale_dollars" - column_specs = { - time_column: "timestamp", - target_column: "numeric", - "city": "categorical", - "zip_code": "categorical", - "county": "categorical", - } - - # Define both training jobs - # TODO(humichael): Add seq2seq job. - automl_job = aiplatform.AutoMLForecastingTrainingJob( - display_name=self._make_display_name("train-housing-automl"), - optimization_objective="minimize-rmse", - column_specs=column_specs, - ) - - # Kick off both training jobs, AutoML job will take approx one hour to - # run. - automl_model = automl_job.run( - dataset=ds, - target_column=target_column, - time_column=time_column, - time_series_identifier_column=time_series_identifier_column, - available_at_forecast_columns=[time_column], - unavailable_at_forecast_columns=[target_column], - time_series_attribute_columns=["city", "zip_code", "county"], - forecast_horizon=30, - context_window=30, - data_granularity_unit="day", - data_granularity_count=1, - budget_milli_node_hours=1000, - model_display_name=self._make_display_name("automl-liquor-model"), - sync=False, - ) - shared_state["resources"].extend([automl_job, automl_model]) - - automl_batch_prediction_job = automl_model.batch_predict( - job_display_name=self._make_display_name("automl-liquor-model"), - instances_format="bigquery", - machine_type="n1-standard-4", - bigquery_source=_PREDICTION_DATASET_BQ_PATH, - gcs_destination_prefix=( - f'gs://{shared_state["staging_bucket_name"]}/bp_results/' - ), - sync=False, - ) - shared_state["resources"].append(automl_batch_prediction_job) - - automl_batch_prediction_job.wait() - - assert automl_job.state == pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED - assert ( - automl_batch_prediction_job.state == job_state.JobState.JOB_STATE_SUCCEEDED - ) + try: + # Create and import to single managed dataset for both training + # jobs. + ds = aiplatform.TimeSeriesDataset.create( + display_name=self._make_display_name("dataset"), + bq_source=[_TRAINING_DATASET_BQ_PATH], + sync=False, + create_request_timeout=180.0, + ) + + time_column = "date" + time_series_identifier_column = "store_name" + target_column = "sale_dollars" + column_specs = { + time_column: "timestamp", + target_column: "numeric", + "city": "categorical", + "zip_code": "categorical", + "county": "categorical", + } + + # Define both training jobs + # TODO(humichael): Add seq2seq job. + automl_job = aiplatform.AutoMLForecastingTrainingJob( + display_name=self._make_display_name("train-housing-automl"), + optimization_objective="minimize-rmse", + column_specs=column_specs, + ) + + # Kick off both training jobs, AutoML job will take approx one hour + # to run. + automl_model = automl_job.run( + dataset=ds, + target_column=target_column, + time_column=time_column, + time_series_identifier_column=time_series_identifier_column, + available_at_forecast_columns=[time_column], + unavailable_at_forecast_columns=[target_column], + time_series_attribute_columns=["city", "zip_code", "county"], + forecast_horizon=30, + context_window=30, + data_granularity_unit="day", + data_granularity_count=1, + budget_milli_node_hours=1000, + model_display_name=self._make_display_name("automl-liquor-model"), + sync=False, + ) + + automl_batch_prediction_job = automl_model.batch_predict( + job_display_name=self._make_display_name("automl-liquor-model"), + instances_format="bigquery", + machine_type="n1-standard-4", + bigquery_source=_PREDICTION_DATASET_BQ_PATH, + gcs_destination_prefix=( + f'gs://{shared_state["staging_bucket_name"]}/bp_results/' + ), + sync=False, + ) + + automl_batch_prediction_job.wait() + + assert ( + automl_job.state + == pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + assert ( + automl_batch_prediction_job.state + == job_state.JobState.JOB_STATE_SUCCEEDED + ) + finally: + resources = [ + "ds", + "automl_job", + "automl_model", + "automl_batch_prediction_job", + ] + for resource in resources: + if resource in locals(): + locals()[resource].delete() From e3e69a42dbb79eab80aa7a8c4a6b8a3fd6663841 Mon Sep 17 00:00:00 2001 From: Michael Hu Date: Tue, 24 May 2022 16:29:01 -0400 Subject: [PATCH 5/8] change data source --- tests/system/aiplatform/test_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/aiplatform/test_dataset.py b/tests/system/aiplatform/test_dataset.py index a2d487db44..7cd3c0416c 100644 --- a/tests/system/aiplatform/test_dataset.py +++ b/tests/system/aiplatform/test_dataset.py @@ -52,7 +52,7 @@ _TEST_DATASET_DISPLAY_NAME = "permanent_50_flowers_dataset" _TEST_TABULAR_CLASSIFICATION_GCS_SOURCE = "gs://ucaip-sample-resources/iris_1000.csv" _TEST_FORECASTING_BQ_SOURCE = ( - "bq://bigquery-public-data:iowa_liquor_sales_forecasting.2020_sales_train" + "bq://ucaip-sample-tests:ucaip_test_us_central1.2020_sales_train" ) _TEST_TEXT_ENTITY_EXTRACTION_GCS_SOURCE = f"gs://{TEST_BUCKET}/ai-platform-unified/sdk/datasets/text_entity_extraction_dataset.jsonl" _TEST_IMAGE_OBJECT_DETECTION_GCS_SOURCE = ( From f247c5d21dc47a7988a3a0095f9d4ec7d7f0fa6f Mon Sep 17 00:00:00 2001 From: Michael Hu Date: Tue, 24 May 2022 18:06:17 -0400 Subject: [PATCH 6/8] instantiate variables --- .../system/aiplatform/test_e2e_forecasting.py | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/tests/system/aiplatform/test_e2e_forecasting.py b/tests/system/aiplatform/test_e2e_forecasting.py index cb5fd2a169..63a441e757 100644 --- a/tests/system/aiplatform/test_e2e_forecasting.py +++ b/tests/system/aiplatform/test_e2e_forecasting.py @@ -37,6 +37,10 @@ class TestEndToEndForecasting(e2e_base.TestEndToEnd): def test_end_to_end_forecasting(self, shared_state): """Builds a dataset, trains models, and gets batch predictions.""" + ds = None + automl_job = None + automl_model = None + automl_batch_prediction_job = None aiplatform.init( project=e2e_base._PROJECT, @@ -113,12 +117,11 @@ def test_end_to_end_forecasting(self, shared_state): == job_state.JobState.JOB_STATE_SUCCEEDED ) finally: - resources = [ - "ds", - "automl_job", - "automl_model", - "automl_batch_prediction_job", - ] - for resource in resources: - if resource in locals(): - locals()[resource].delete() + if ds is not None: + ds.delete() + if automl_job is not None: + automl_job.delete() + if automl_model is not None: + automl_model.delete() + if automl_batch_prediction_job is not None: + automl_batch_prediction_job.delete() From 5caba45ca1fe1c105e85d5d0be5e94fe08f7d510 Mon Sep 17 00:00:00 2001 From: Michael Hu Date: Tue, 24 May 2022 19:30:48 -0400 Subject: [PATCH 7/8] oops --- tests/system/aiplatform/test_e2e_forecasting.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/system/aiplatform/test_e2e_forecasting.py b/tests/system/aiplatform/test_e2e_forecasting.py index 63a441e757..097a6d0bb1 100644 --- a/tests/system/aiplatform/test_e2e_forecasting.py +++ b/tests/system/aiplatform/test_e2e_forecasting.py @@ -118,10 +118,10 @@ def test_end_to_end_forecasting(self, shared_state): ) finally: if ds is not None: - ds.delete() + ds.delete() if automl_job is not None: - automl_job.delete() + automl_job.delete() if automl_model is not None: - automl_model.delete() + automl_model.delete() if automl_batch_prediction_job is not None: - automl_batch_prediction_job.delete() + automl_batch_prediction_job.delete() From 604e85a8c107cece2ab764ab614295d44f03a64e Mon Sep 17 00:00:00 2001 From: Michael Hu Date: Fri, 27 May 2022 11:30:05 -0400 Subject: [PATCH 8/8] update bq uris --- tests/system/aiplatform/test_e2e_forecasting.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system/aiplatform/test_e2e_forecasting.py b/tests/system/aiplatform/test_e2e_forecasting.py index 097a6d0bb1..b0f3e19711 100644 --- a/tests/system/aiplatform/test_e2e_forecasting.py +++ b/tests/system/aiplatform/test_e2e_forecasting.py @@ -22,10 +22,10 @@ from tests.system.aiplatform import e2e_base _TRAINING_DATASET_BQ_PATH = ( - "bq://bigquery-public-data:iowa_liquor_sales_forecasting.2020_sales_train" + "bq://ucaip-sample-tests:ucaip_test_us_central1.2020_sales_train" ) _PREDICTION_DATASET_BQ_PATH = ( - "bq://bigquery-public-data:iowa_liquor_sales_forecasting.2021_sales_predict" + "bq://ucaip-sample-tests:ucaip_test_us_central1.2021_sales_predict" )