Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions google/cloud/aiplatform/datasets/time_series_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def create(
labels: Optional[Dict[str, str]] = None,
encryption_spec_key_name: Optional[str] = None,
sync: bool = True,
create_request_timeout: Optional[float] = None,
) -> "TimeSeriesDataset":
"""Creates a new time series dataset.

Expand Down Expand Up @@ -102,6 +103,8 @@ def create(
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
create_request_timeout (float):
Optional. The timeout for the create request in seconds.

Returns:
time_series_dataset (TimeSeriesDataset):
Expand Down Expand Up @@ -141,6 +144,7 @@ def create(
encryption_spec_key_name=encryption_spec_key_name
),
sync=sync,
create_request_timeout=create_request_timeout,
)

def import_data(self):
Expand Down
27 changes: 27 additions & 0 deletions tests/system/aiplatform/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
)
_TEST_DATASET_DISPLAY_NAME = "permanent_50_flowers_dataset"
_TEST_TABULAR_CLASSIFICATION_GCS_SOURCE = "gs://ucaip-sample-resources/iris_1000.csv"
_TEST_FORECASTING_BQ_SOURCE = (
Comment thread
TheMichaelHu marked this conversation as resolved.
"bq://ucaip-sample-tests:ucaip_test_us_central1.2020_sales_train"
)
_TEST_TEXT_ENTITY_EXTRACTION_GCS_SOURCE = f"gs://{TEST_BUCKET}/ai-platform-unified/sdk/datasets/text_entity_extraction_dataset.jsonl"
_TEST_IMAGE_OBJECT_DETECTION_GCS_SOURCE = (
"gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl"
Expand Down Expand Up @@ -306,6 +309,30 @@ def test_create_tabular_dataset_from_dataframe_with_provided_schema(
finally:
tabular_dataset.delete()

def test_create_time_series_dataset(self):
Comment thread
TheMichaelHu marked this conversation as resolved.
"""Use the Dataset.create() method to create a new time series dataset.
Then confirm the dataset was successfully created and references GCS source."""

try:
time_series_dataset = aiplatform.TimeSeriesDataset.create(
display_name=self._make_display_name(key="create_time_series_dataset"),
bq_source=[_TEST_FORECASTING_BQ_SOURCE],
create_request_timeout=None,
)

gapic_metadata = time_series_dataset.to_dict()["metadata"]
bq_source_uri = gapic_metadata["inputConfig"]["bigquerySource"]["uri"]

assert _TEST_FORECASTING_BQ_SOURCE == bq_source_uri
assert (
time_series_dataset.metadata_schema_uri
== aiplatform.schema.dataset.metadata.time_series
)

finally:
if time_series_dataset is not None:
time_series_dataset.delete()

def test_export_data(self, storage_client, staging_bucket):
"""Get an existing dataset, export data to a newly created folder in
Google Cloud Storage, then verify data was successfully exported."""
Expand Down
127 changes: 127 additions & 0 deletions tests/system/aiplatform/test_e2e_forecasting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-

# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from google.cloud import aiplatform
from google.cloud.aiplatform.compat.types import job_state
from google.cloud.aiplatform.compat.types import pipeline_state
import pytest
from tests.system.aiplatform import e2e_base

_TRAINING_DATASET_BQ_PATH = (
Comment thread
TheMichaelHu marked this conversation as resolved.
"bq://ucaip-sample-tests:ucaip_test_us_central1.2020_sales_train"
)
_PREDICTION_DATASET_BQ_PATH = (
"bq://ucaip-sample-tests:ucaip_test_us_central1.2021_sales_predict"
)


@pytest.mark.usefixtures("prepare_staging_bucket", "delete_staging_bucket")
class TestEndToEndForecasting(e2e_base.TestEndToEnd):
"""End to end system test of the Vertex SDK with forecasting data."""

_temp_prefix = "temp-vertex-sdk-e2e-forecasting"

def test_end_to_end_forecasting(self, shared_state):
"""Builds a dataset, trains models, and gets batch predictions."""
ds = None
automl_job = None
automl_model = None
automl_batch_prediction_job = None

aiplatform.init(
project=e2e_base._PROJECT,
location=e2e_base._LOCATION,
staging_bucket=shared_state["staging_bucket_name"],
Comment thread
TheMichaelHu marked this conversation as resolved.
)
try:
# Create and import to single managed dataset for both training
# jobs.
ds = aiplatform.TimeSeriesDataset.create(
display_name=self._make_display_name("dataset"),
bq_source=[_TRAINING_DATASET_BQ_PATH],
sync=False,
create_request_timeout=180.0,
)

time_column = "date"
time_series_identifier_column = "store_name"
target_column = "sale_dollars"
column_specs = {
time_column: "timestamp",
target_column: "numeric",
"city": "categorical",
"zip_code": "categorical",
"county": "categorical",
}

# Define both training jobs
# TODO(humichael): Add seq2seq job.
automl_job = aiplatform.AutoMLForecastingTrainingJob(
display_name=self._make_display_name("train-housing-automl"),
optimization_objective="minimize-rmse",
column_specs=column_specs,
)

# Kick off both training jobs, AutoML job will take approx one hour
# to run.
automl_model = automl_job.run(
dataset=ds,
target_column=target_column,
time_column=time_column,
time_series_identifier_column=time_series_identifier_column,
available_at_forecast_columns=[time_column],
unavailable_at_forecast_columns=[target_column],
time_series_attribute_columns=["city", "zip_code", "county"],
forecast_horizon=30,
context_window=30,
data_granularity_unit="day",
data_granularity_count=1,
budget_milli_node_hours=1000,
model_display_name=self._make_display_name("automl-liquor-model"),
sync=False,
)

automl_batch_prediction_job = automl_model.batch_predict(
job_display_name=self._make_display_name("automl-liquor-model"),
instances_format="bigquery",
machine_type="n1-standard-4",
bigquery_source=_PREDICTION_DATASET_BQ_PATH,
gcs_destination_prefix=(
f'gs://{shared_state["staging_bucket_name"]}/bp_results/'
),
sync=False,
)

automl_batch_prediction_job.wait()

assert (
automl_job.state
== pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED
)
assert (
automl_batch_prediction_job.state
== job_state.JobState.JOB_STATE_SUCCEEDED
)
finally:
if ds is not None:
ds.delete()
if automl_job is not None:
automl_job.delete()
if automl_model is not None:
automl_model.delete()
if automl_batch_prediction_job is not None:
automl_batch_prediction_job.delete()