diff --git a/docs/definition_v1/types.rst b/docs/aiplatform/definition_v1/types.rst similarity index 100% rename from docs/definition_v1/types.rst rename to docs/aiplatform/definition_v1/types.rst diff --git a/docs/definition_v1beta1/types.rst b/docs/aiplatform/definition_v1beta1/types.rst similarity index 100% rename from docs/definition_v1beta1/types.rst rename to docs/aiplatform/definition_v1beta1/types.rst diff --git a/docs/instance_v1/types.rst b/docs/aiplatform/instance_v1/types.rst similarity index 100% rename from docs/instance_v1/types.rst rename to docs/aiplatform/instance_v1/types.rst diff --git a/docs/instance_v1beta1/types.rst b/docs/aiplatform/instance_v1beta1/types.rst similarity index 100% rename from docs/instance_v1beta1/types.rst rename to docs/aiplatform/instance_v1beta1/types.rst diff --git a/docs/params_v1/types.rst b/docs/aiplatform/params_v1/types.rst similarity index 100% rename from docs/params_v1/types.rst rename to docs/aiplatform/params_v1/types.rst diff --git a/docs/params_v1beta1/types.rst b/docs/aiplatform/params_v1beta1/types.rst similarity index 100% rename from docs/params_v1beta1/types.rst rename to docs/aiplatform/params_v1beta1/types.rst diff --git a/docs/prediction_v1/types.rst b/docs/aiplatform/prediction_v1/types.rst similarity index 100% rename from docs/prediction_v1/types.rst rename to docs/aiplatform/prediction_v1/types.rst diff --git a/docs/prediction_v1beta1/types.rst b/docs/aiplatform/prediction_v1beta1/types.rst similarity index 100% rename from docs/prediction_v1beta1/types.rst rename to docs/aiplatform/prediction_v1beta1/types.rst diff --git a/docs/aiplatform.rst b/docs/aiplatform/services.rst similarity index 84% rename from docs/aiplatform.rst rename to docs/aiplatform/services.rst index bf5cd4625b..0d21fe6bd1 100644 --- a/docs/aiplatform.rst +++ b/docs/aiplatform/services.rst @@ -3,4 +3,4 @@ Google Cloud Aiplatform SDK .. automodule:: google.cloud.aiplatform :members: - :show-inheritance: \ No newline at end of file + :show-inheritance: diff --git a/docs/aiplatform/types.rst b/docs/aiplatform/types.rst new file mode 100644 index 0000000000..119f762bca --- /dev/null +++ b/docs/aiplatform/types.rst @@ -0,0 +1,13 @@ +Types for Google Cloud Aiplatform SDK API +=========================================== +.. toctree:: + :maxdepth: 2 + + instance_v1 + instance_v1beta1 + params_v1 + params_v1beta1 + prediction_v1 + prediction_v1beta1 + definition_v1 + definition_v1beta1 diff --git a/docs/index.rst b/docs/index.rst index 031271a261..6094720bd8 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -7,7 +7,9 @@ API Reference .. toctree:: :maxdepth: 2 - aiplatform + aiplatform/services + aiplatform/types + aiplatform_v1/services aiplatform_v1/types @@ -22,4 +24,4 @@ For a list of all ``google-cloud-aiplatform`` releases: .. toctree:: :maxdepth: 2 - changelog \ No newline at end of file + changelog diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index fc4f829882..00d6f11780 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2020 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -40,6 +40,7 @@ job_state as gca_job_state, hyperparameter_tuning_job as gca_hyperparameter_tuning_job_compat, machine_resources as gca_machine_resources_compat, + manual_batch_tuning_parameters as gca_manual_batch_tuning_parameters_compat, study as gca_study_compat, ) from google.cloud.aiplatform.constants import base as constants @@ -376,6 +377,7 @@ def create( encryption_spec_key_name: Optional[str] = None, sync: bool = True, create_request_timeout: Optional[float] = None, + batch_size: Optional[int] = None, ) -> "BatchPredictionJob": """Create a batch prediction job. @@ -534,6 +536,13 @@ def create( be immediately returned and synced when the Future has completed. create_request_timeout (float): Optional. The timeout for the create request in seconds. + batch_size (int): + Optional. The number of the records (e.g. instances) of the operation given in each batch + to a machine replica. Machine type, and size of a single record should be considered + when setting this parameter, higher value speeds up the batch operation's execution, + but too high value will result in a whole batch not fitting in a machine's memory, + and the whole operation will fail. + The default value is 64. Returns: (jobs.BatchPredictionJob): Instantiated representation of the created batch prediction job. @@ -647,7 +656,14 @@ def create( gapic_batch_prediction_job.dedicated_resources = dedicated_resources - gapic_batch_prediction_job.manual_batch_tuning_parameters = None + manual_batch_tuning_parameters = ( + gca_manual_batch_tuning_parameters_compat.ManualBatchTuningParameters() + ) + manual_batch_tuning_parameters.batch_size = batch_size + + gapic_batch_prediction_job.manual_batch_tuning_parameters = ( + manual_batch_tuning_parameters + ) # User Labels gapic_batch_prediction_job.labels = labels diff --git a/google/cloud/aiplatform/matching_engine/_protos/match_service.proto b/google/cloud/aiplatform/matching_engine/_protos/match_service.proto new file mode 100644 index 0000000000..158b0f146a --- /dev/null +++ b/google/cloud/aiplatform/matching_engine/_protos/match_service.proto @@ -0,0 +1,136 @@ +syntax = "proto3"; + +package google.cloud.aiplatform.container.v1beta1; + +import "google/rpc/status.proto"; + +// MatchService is a Google managed service for efficient vector similarity +// search at scale. +service MatchService { + // Returns the nearest neighbors for the query. If it is a sharded + // deployment, calls the other shards and aggregates the responses. + rpc Match(MatchRequest) returns (MatchResponse) {} + + // Returns the nearest neighbors for batch queries. If it is a sharded + // deployment, calls the other shards and aggregates the responses. + rpc BatchMatch(BatchMatchRequest) returns (BatchMatchResponse) {} +} + +// Parameters for a match query. +message MatchRequest { + // The ID of the DeploydIndex that will serve the request. + // This MatchRequest is sent to a specific IndexEndpoint of the Control API, + // as per the IndexEndpoint.network. That IndexEndpoint also has + // IndexEndpoint.deployed_indexes, and each such index has an + // DeployedIndex.id field. + // The value of the field below must equal one of the DeployedIndex.id + // fields of the IndexEndpoint that is being called for this request. + string deployed_index_id = 1; + + // The embedding values. + repeated float float_val = 2; + + // The number of nearest neighbors to be retrieved from database for + // each query. If not set, will use the default from + // the service configuration. + int32 num_neighbors = 3; + + // The list of restricts. + repeated Namespace restricts = 4; + + // Crowding is a constraint on a neighbor list produced by nearest neighbor + // search requiring that no more than some value k' of the k neighbors + // returned have the same value of crowding_attribute. + // It's used for improving result diversity. + // This field is the maximum number of matches with the same crowding tag. + int32 per_crowding_attribute_num_neighbors = 5; + + // The number of neighbors to find via approximate search before + // exact reordering is performed. If not set, the default value from scam + // config is used; if set, this value must be > 0. + int32 approx_num_neighbors = 6; + + // The fraction of the number of leaves to search, set at query time allows + // user to tune search performance. This value increase result in both search + // accuracy and latency increase. The value should be between 0.0 and 1.0. If + // not set or set to 0.0, query uses the default value specified in + // NearestNeighborSearchConfig.TreeAHConfig.leaf_nodes_to_search_percent. + int32 leaf_nodes_to_search_percent_override = 7; +} + +// Response of a match query. +message MatchResponse { + message Neighbor { + // The ids of the matches. + string id = 1; + + // The distances of the matches. + double distance = 2; + } + // All its neighbors. + repeated Neighbor neighbor = 1; +} + +// Parameters for a batch match query. +message BatchMatchRequest { + // Batched requests against one index. + message BatchMatchRequestPerIndex { + // The ID of the DeploydIndex that will serve the request. + string deployed_index_id = 1; + + // The requests against the index identified by the above deployed_index_id. + repeated MatchRequest requests = 2; + + // Selects the optimal batch size to use for low-level batching. Queries + // within each low level batch are executed sequentially while low level + // batches are executed in parallel. + // This field is optional, defaults to 0 if not set. A non-positive number + // disables low level batching, i.e. all queries are executed sequentially. + int32 low_level_batch_size = 3; + } + + // The batch requests grouped by indexes. + repeated BatchMatchRequestPerIndex requests = 1; +} + +// Response of a batch match query. +message BatchMatchResponse { + // Batched responses for one index. + message BatchMatchResponsePerIndex { + // The ID of the DeployedIndex that produced the responses. + string deployed_index_id = 1; + + // The match responses produced by the index identified by the above + // deployed_index_id. This field is set only when the query against that + // index succeed. + repeated MatchResponse responses = 2; + + // The status of response for the batch query identified by the above + // deployed_index_id. + google.rpc.Status status = 3; + } + + // The batched responses grouped by indexes. + repeated BatchMatchResponsePerIndex responses = 1; +} + +// Namespace specifies the rules for determining the datapoints that are +// eligible for each matching query, overall query is an AND across namespaces. +message Namespace { + // The string name of the namespace that this proto is specifying, + // such as "color", "shape", "geo", or "tags". + string name = 1; + + // The allowed tokens in the namespace. + repeated string allow_tokens = 2; + + // The denied tokens in the namespace. + // The denied tokens have exactly the same format as the token fields, but + // represents a negation. When a token is denied, then matches will be + // excluded whenever the other datapoint has that token. + // + // For example, if a query specifies {color: red, blue, !purple}, then that + // query will match datapoints that are red or blue, but if those points are + // also purple, then they will be excluded even if they are red/blue. + repeated string deny_tokens = 3; +} diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index b15ed791bf..95f3044cbe 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2020 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -2284,6 +2284,7 @@ def batch_predict( encryption_spec_key_name: Optional[str] = None, sync: bool = True, create_request_timeout: Optional[float] = None, + batch_size: Optional[int] = None, ) -> jobs.BatchPredictionJob: """Creates a batch prediction job using this Model and outputs prediction results to the provided destination prefix in the specified @@ -2442,6 +2443,13 @@ def batch_predict( Overrides encryption_spec_key_name set in aiplatform.init. create_request_timeout (float): Optional. The timeout for the create request in seconds. + batch_size (int): + Optional. The number of the records (e.g. instances) of the operation given in each batch + to a machine replica. Machine type, and size of a single record should be considered + when setting this parameter, higher value speeds up the batch operation's execution, + but too high value will result in a whole batch not fitting in a machine's memory, + and the whole operation will fail. + The default value is 64. Returns: (jobs.BatchPredictionJob): Instantiated representation of the created batch prediction job. @@ -2462,6 +2470,7 @@ def batch_predict( accelerator_count=accelerator_count, starting_replica_count=starting_replica_count, max_replica_count=max_replica_count, + batch_size=batch_size, generate_explanation=generate_explanation, explanation_metadata=explanation_metadata, explanation_parameters=explanation_parameters, diff --git a/tests/system/aiplatform/test_dataset.py b/tests/system/aiplatform/test_dataset.py index f4033bfb7e..f152ed0e32 100644 --- a/tests/system/aiplatform/test_dataset.py +++ b/tests/system/aiplatform/test_dataset.py @@ -131,7 +131,7 @@ def setup_method(self): @pytest.fixture() def storage_client(self): - yield storage.Client(project=e2e_base._PROJECT) + yield storage.Client(project=_TEST_PROJECT) @pytest.fixture() def staging_bucket(self, storage_client): @@ -174,7 +174,7 @@ def test_get_new_dataset_and_import(self, dataset_gapic_client): try: text_dataset = aiplatform.TextDataset.create( - display_name=f"temp_sdk_integration_test_create_text_dataset_{uuid.uuid4()}", + display_name=self._make_display_name(key="get_new_dataset_and_import"), ) my_dataset = aiplatform.TextDataset(dataset_name=text_dataset.name) @@ -189,7 +189,6 @@ def test_get_new_dataset_and_import(self, dataset_gapic_client): my_dataset.import_data( gcs_source=_TEST_TEXT_ENTITY_EXTRACTION_GCS_SOURCE, import_schema_uri=_TEST_TEXT_ENTITY_IMPORT_SCHEMA, - import_request_timeout=600.0, ) data_items_post_import = dataset_gapic_client.list_data_items( @@ -198,8 +197,7 @@ def test_get_new_dataset_and_import(self, dataset_gapic_client): assert len(list(data_items_post_import)) == 469 finally: - if text_dataset is not None: - text_dataset.delete() + text_dataset.delete() @vpcsc_config.skip_if_inside_vpcsc def test_create_and_import_image_dataset(self, dataset_gapic_client): @@ -208,7 +206,9 @@ def test_create_and_import_image_dataset(self, dataset_gapic_client): try: img_dataset = aiplatform.ImageDataset.create( - display_name=f"temp_sdk_integration_create_and_import_dataset_{uuid.uuid4()}", + display_name=self._make_display_name( + key="create_and_import_image_dataset" + ), gcs_source=_TEST_IMAGE_OBJECT_DETECTION_GCS_SOURCE, import_schema_uri=_TEST_IMAGE_OBJ_DET_IMPORT_SCHEMA, create_request_timeout=None, @@ -230,7 +230,7 @@ def test_create_tabular_dataset(self): try: tabular_dataset = aiplatform.TabularDataset.create( - display_name=f"temp_sdk_integration_create_and_import_dataset_{uuid.uuid4()}", + display_name=self._make_display_name(key="create_tabular_dataset"), gcs_source=[_TEST_TABULAR_CLASSIFICATION_GCS_SOURCE], create_request_timeout=None, ) @@ -250,13 +250,15 @@ def test_create_tabular_dataset(self): tabular_dataset.delete() def test_create_tabular_dataset_from_dataframe(self, bigquery_dataset): - bq_staging_table = f"bq://{e2e_base._PROJECT}.{bigquery_dataset.dataset_id}.test_table{uuid.uuid4()}" + bq_staging_table = f"bq://{_TEST_PROJECT}.{bigquery_dataset.dataset_id}.test_table{uuid.uuid4()}" try: tabular_dataset = aiplatform.TabularDataset.create_from_dataframe( df_source=_TEST_DATAFRAME, staging_path=bq_staging_table, - display_name=f"temp_sdk_integration_create_and_import_dataset_from_dataframe{uuid.uuid4()}", + display_name=self._make_display_name( + key="create_and_import_dataset_from_dataframe" + ), ) """Use the Dataset.create_from_dataframe() method to create a new tabular dataset. @@ -281,12 +283,14 @@ def test_create_tabular_dataset_from_dataframe_with_provided_schema( created and references the BQ source.""" try: - bq_staging_table = f"bq://{e2e_base._PROJECT}.{bigquery_dataset.dataset_id}.test_table{uuid.uuid4()}" + bq_staging_table = f"bq://{_TEST_PROJECT}.{bigquery_dataset.dataset_id}.test_table{uuid.uuid4()}" tabular_dataset = aiplatform.TabularDataset.create_from_dataframe( df_source=_TEST_DATAFRAME, staging_path=bq_staging_table, - display_name=f"temp_sdk_integration_create_and_import_dataset_from_dataframe{uuid.uuid4()}", + display_name=self._make_display_name( + key="create_and_import_dataset_from_dataframe" + ), bq_schema=_TEST_DATAFRAME_BQ_SCHEMA, ) diff --git a/tests/unit/aiplatform/test_jobs.py b/tests/unit/aiplatform/test_jobs.py index 6b8d908dd2..73a4f8da0c 100644 --- a/tests/unit/aiplatform/test_jobs.py +++ b/tests/unit/aiplatform/test_jobs.py @@ -37,6 +37,7 @@ io as gca_io_compat, job_state as gca_job_state_compat, machine_resources as gca_machine_resources_compat, + manual_batch_tuning_parameters as gca_manual_batch_tuning_parameters_compat, ) from google.cloud.aiplatform_v1.services.job_service import client as job_service_client @@ -132,6 +133,7 @@ _TEST_ACCELERATOR_COUNT = 2 _TEST_STARTING_REPLICA_COUNT = 2 _TEST_MAX_REPLICA_COUNT = 12 +_TEST_BATCH_SIZE = 16 _TEST_LABEL = {"team": "experimentation", "trial_id": "x435"} @@ -725,6 +727,7 @@ def test_batch_predict_with_all_args( credentials=creds, sync=sync, create_request_timeout=None, + batch_size=_TEST_BATCH_SIZE, ) batch_prediction_job.wait_for_resource_creation() @@ -756,6 +759,9 @@ def test_batch_predict_with_all_args( starting_replica_count=_TEST_STARTING_REPLICA_COUNT, max_replica_count=_TEST_MAX_REPLICA_COUNT, ), + manual_batch_tuning_parameters=gca_manual_batch_tuning_parameters_compat.ManualBatchTuningParameters( + batch_size=_TEST_BATCH_SIZE + ), generate_explanation=True, explanation_spec=gca_explanation_compat.ExplanationSpec( metadata=_TEST_EXPLANATION_METADATA, diff --git a/tests/unit/aiplatform/test_models.py b/tests/unit/aiplatform/test_models.py index f6561cffaa..eaf63d9fdd 100644 --- a/tests/unit/aiplatform/test_models.py +++ b/tests/unit/aiplatform/test_models.py @@ -49,6 +49,7 @@ env_var as gca_env_var, explanation as gca_explanation, machine_resources as gca_machine_resources, + manual_batch_tuning_parameters as gca_manual_batch_tuning_parameters_compat, model_service as gca_model_service, model_evaluation as gca_model_evaluation, endpoint_service as gca_endpoint_service, @@ -86,6 +87,8 @@ _TEST_STARTING_REPLICA_COUNT = 2 _TEST_MAX_REPLICA_COUNT = 12 +_TEST_BATCH_SIZE = 16 + _TEST_PIPELINE_RESOURCE_NAME = ( "projects/my-project/locations/us-central1/trainingPipeline/12345" ) @@ -1402,47 +1405,47 @@ def test_batch_predict_with_all_args(self, create_batch_prediction_job_mock, syn encryption_spec_key_name=_TEST_ENCRYPTION_KEY_NAME, sync=sync, create_request_timeout=None, + batch_size=_TEST_BATCH_SIZE, ) if not sync: batch_prediction_job.wait() # Construct expected request - expected_gapic_batch_prediction_job = ( - gca_batch_prediction_job.BatchPredictionJob( - display_name=_TEST_BATCH_PREDICTION_DISPLAY_NAME, - model=model_service_client.ModelServiceClient.model_path( - _TEST_PROJECT, _TEST_LOCATION, _TEST_ID - ), - input_config=gca_batch_prediction_job.BatchPredictionJob.InputConfig( - instances_format="jsonl", - gcs_source=gca_io.GcsSource( - uris=[_TEST_BATCH_PREDICTION_GCS_SOURCE] - ), - ), - output_config=gca_batch_prediction_job.BatchPredictionJob.OutputConfig( - gcs_destination=gca_io.GcsDestination( - output_uri_prefix=_TEST_BATCH_PREDICTION_GCS_DEST_PREFIX - ), - predictions_format="csv", - ), - dedicated_resources=gca_machine_resources.BatchDedicatedResources( - machine_spec=gca_machine_resources.MachineSpec( - machine_type=_TEST_MACHINE_TYPE, - accelerator_type=_TEST_ACCELERATOR_TYPE, - accelerator_count=_TEST_ACCELERATOR_COUNT, - ), - starting_replica_count=_TEST_STARTING_REPLICA_COUNT, - max_replica_count=_TEST_MAX_REPLICA_COUNT, + expected_gapic_batch_prediction_job = gca_batch_prediction_job.BatchPredictionJob( + display_name=_TEST_BATCH_PREDICTION_DISPLAY_NAME, + model=model_service_client.ModelServiceClient.model_path( + _TEST_PROJECT, _TEST_LOCATION, _TEST_ID + ), + input_config=gca_batch_prediction_job.BatchPredictionJob.InputConfig( + instances_format="jsonl", + gcs_source=gca_io.GcsSource(uris=[_TEST_BATCH_PREDICTION_GCS_SOURCE]), + ), + output_config=gca_batch_prediction_job.BatchPredictionJob.OutputConfig( + gcs_destination=gca_io.GcsDestination( + output_uri_prefix=_TEST_BATCH_PREDICTION_GCS_DEST_PREFIX ), - generate_explanation=True, - explanation_spec=gca_explanation.ExplanationSpec( - metadata=_TEST_EXPLANATION_METADATA, - parameters=_TEST_EXPLANATION_PARAMETERS, + predictions_format="csv", + ), + dedicated_resources=gca_machine_resources.BatchDedicatedResources( + machine_spec=gca_machine_resources.MachineSpec( + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, ), - labels=_TEST_LABEL, - encryption_spec=_TEST_ENCRYPTION_SPEC, - ) + starting_replica_count=_TEST_STARTING_REPLICA_COUNT, + max_replica_count=_TEST_MAX_REPLICA_COUNT, + ), + manual_batch_tuning_parameters=gca_manual_batch_tuning_parameters_compat.ManualBatchTuningParameters( + batch_size=_TEST_BATCH_SIZE + ), + generate_explanation=True, + explanation_spec=gca_explanation.ExplanationSpec( + metadata=_TEST_EXPLANATION_METADATA, + parameters=_TEST_EXPLANATION_PARAMETERS, + ), + labels=_TEST_LABEL, + encryption_spec=_TEST_ENCRYPTION_SPEC, ) create_batch_prediction_job_mock.assert_called_once_with(