diff --git a/google/cloud/bigquery/_job_helpers.py b/google/cloud/bigquery/_job_helpers.py index e66ab2763..b028cd357 100644 --- a/google/cloud/bigquery/_job_helpers.py +++ b/google/cloud/bigquery/_job_helpers.py @@ -47,6 +47,7 @@ from google.cloud.bigquery import job import google.cloud.bigquery.query from google.cloud.bigquery import table +import google.cloud.bigquery.retry from google.cloud.bigquery.retry import POLLING_DEFAULT_VALUE # Avoid circular imports @@ -142,12 +143,28 @@ def do_query(): raise create_exc try: + # Sometimes we get a 404 after a Conflict. In this case, we + # have pretty high confidence that by retrying the 404, we'll + # (hopefully) eventually recover the job. + # https://github.com/googleapis/python-bigquery/issues/2134 + # + # Allow users who want to completely disable retries to + # continue to do so by setting retry to None. + get_job_retry = retry + if retry is not None: + # TODO(tswast): Amend the user's retry object with allowing + # 404 to retry when there's a public way to do so. + # https://github.com/googleapis/python-api-core/issues/796 + get_job_retry = ( + google.cloud.bigquery.retry._DEFAULT_GET_JOB_CONFLICT_RETRY + ) + query_job = client.get_job( job_id, project=project, location=location, - retry=retry, - timeout=timeout, + retry=get_job_retry, + timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT, ) except core_exceptions.GoogleAPIError: # (includes RetryError) raise @@ -156,7 +173,13 @@ def do_query(): else: return query_job + # Allow users who want to completely disable retries to + # continue to do so by setting job_retry to None. + if job_retry is not None: + do_query = google.cloud.bigquery.retry._DEFAULT_QUERY_JOB_INSERT_RETRY(do_query) + future = do_query() + # The future might be in a failed state now, but if it's # unrecoverable, we'll find out when we ask for it's result, at which # point, we may retry. diff --git a/google/cloud/bigquery/retry.py b/google/cloud/bigquery/retry.py index 10958980d..999d0e851 100644 --- a/google/cloud/bigquery/retry.py +++ b/google/cloud/bigquery/retry.py @@ -82,6 +82,32 @@ def _should_retry(exc): pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``. """ + +def _should_retry_get_job_conflict(exc): + """Predicate for determining when to retry a jobs.get call after a conflict error. + + Sometimes we get a 404 after a Conflict. In this case, we + have pretty high confidence that by retrying the 404, we'll + (hopefully) eventually recover the job. + https://github.com/googleapis/python-bigquery/issues/2134 + + Note: we may be able to extend this to user-specified predicates + after https://github.com/googleapis/python-api-core/issues/796 + to tweak existing Retry object predicates. + """ + return isinstance(exc, exceptions.NotFound) or _should_retry(exc) + + +# Pick a deadline smaller than our other deadlines since we want to timeout +# before those expire. +_DEFAULT_GET_JOB_CONFLICT_DEADLINE = _DEFAULT_RETRY_DEADLINE / 3.0 +_DEFAULT_GET_JOB_CONFLICT_RETRY = retry.Retry( + predicate=_should_retry_get_job_conflict, + deadline=_DEFAULT_GET_JOB_CONFLICT_DEADLINE, +) +"""Private, may be removed in future.""" + + # Note: Take care when updating DEFAULT_TIMEOUT to anything but None. We # briefly had a default timeout, but even setting it at more than twice the # theoretical server-side default timeout of 2 minutes was not enough for @@ -142,6 +168,34 @@ def _job_should_retry(exc): The default job retry object. """ + +def _query_job_insert_should_retry(exc): + # Per https://github.com/googleapis/python-bigquery/issues/2134, sometimes + # we get a 404 error. In this case, if we get this far, assume that the job + # doesn't actually exist and try again. We can't add 404 to the default + # job_retry because that happens for errors like "this table does not + # exist", which probably won't resolve with a retry. + if isinstance(exc, exceptions.RetryError): + exc = exc.cause + + if isinstance(exc, exceptions.NotFound): + message = exc.message + # Don't try to retry table/dataset not found, just job not found. + # The URL contains jobs, so use whitespace to disambiguate. + return message is not None and " job" in message.lower() + + return _job_should_retry(exc) + + +_DEFAULT_QUERY_JOB_INSERT_RETRY = retry.Retry( + predicate=_query_job_insert_should_retry, + # jobs.insert doesn't wait for the job to complete, so we don't need the + # long _DEFAULT_JOB_DEADLINE for this part. + deadline=_DEFAULT_RETRY_DEADLINE, +) +"""Private, may be removed in future.""" + + DEFAULT_GET_JOB_TIMEOUT = 128 """ Default timeout for Client.get_job(). diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 6897c2552..4f13d6ecc 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -28,9 +28,12 @@ from unittest import mock import warnings -import requests +import freezegun import packaging import pytest +import requests + +import google.api try: @@ -55,6 +58,8 @@ import google.cloud._helpers from google.cloud import bigquery +from google.cloud.bigquery import job as bqjob +import google.cloud.bigquery._job_helpers from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery import exceptions from google.cloud.bigquery import ParquetOptions @@ -5308,6 +5313,173 @@ def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_fails(self): with pytest.raises(DataLoss, match="we lost your job, sorry"): client.query("SELECT 1;", job_id=None) + def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_fails_no_retries(self): + from google.api_core.exceptions import Conflict + from google.api_core.exceptions import DataLoss + from google.cloud.bigquery.job import QueryJob + + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + job_create_error = Conflict("Job already exists.") + job_begin_patcher = mock.patch.object( + QueryJob, "_begin", side_effect=job_create_error + ) + get_job_patcher = mock.patch.object( + client, "get_job", side_effect=DataLoss("we lost your job, sorry") + ) + + with job_begin_patcher, get_job_patcher: + # If get job request fails but supposedly there does exist a job + # with this ID already, raise the exception explaining why we + # couldn't recover the job. + with pytest.raises(DataLoss, match="we lost your job, sorry"): + client.query( + "SELECT 1;", + job_id=None, + # Explicitly test with no retries to make sure those branches are covered. + retry=None, + job_retry=None, + ) + + def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_retries_404(self): + """Regression test for https://github.com/googleapis/python-bigquery/issues/2134 + + Sometimes after a Conflict, the fetch fails with a 404, but we know + because of the conflict that really the job does exist. Retry until we + get the job status (or timeout). + """ + job_id = "abc123" + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection( + # We're mocking QueryJob._begin, so this is only going to be + # jobs.get requests and responses. + google.api_core.exceptions.TooManyRequests("this is retriable by default"), + google.api_core.exceptions.NotFound("we lost your job"), + google.api_core.exceptions.NotFound("we lost your job again, sorry"), + { + "jobReference": { + "projectId": self.PROJECT, + "location": "TESTLOC", + "jobId": job_id, + } + }, + ) + + job_create_error = google.api_core.exceptions.Conflict("Job already exists.") + job_begin_patcher = mock.patch.object( + bqjob.QueryJob, "_begin", side_effect=job_create_error + ) + job_id_patcher = mock.patch.object( + google.cloud.bigquery._job_helpers, + "make_job_id", + return_value=job_id, + ) + + with job_begin_patcher, job_id_patcher: + # If get job request fails there does exist a job + # with this ID already, retry 404 until we get it (or fails for a + # non-retriable reason, see other tests). + result = client.query("SELECT 1;", job_id=None) + + jobs_get_path = mock.call( + method="GET", + path=f"/projects/{self.PROJECT}/jobs/{job_id}", + query_params={ + "projection": "full", + }, + timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT, + ) + conn.api_request.assert_has_calls( + # Double-check that it was jobs.get that was called for each of our + # mocked responses. + [jobs_get_path] + * 4, + ) + assert result.job_id == job_id + + def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_retries_404_and_query_job_insert( + self, + ): + """Regression test for https://github.com/googleapis/python-bigquery/issues/2134 + + Sometimes after a Conflict, the fetch fails with a 404. If it keeps + failing with a 404, assume that the job actually doesn't exist. + """ + job_id_1 = "abc123" + job_id_2 = "xyz789" + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + # We're mocking QueryJob._begin, so that the connection should only get + # jobs.get requests. + job_create_error = google.api_core.exceptions.Conflict("Job already exists.") + job_begin_patcher = mock.patch.object( + bqjob.QueryJob, "_begin", side_effect=job_create_error + ) + conn = client._connection = make_connection( + google.api_core.exceptions.NotFound("we lost your job again, sorry"), + { + "jobReference": { + "projectId": self.PROJECT, + "location": "TESTLOC", + "jobId": job_id_2, + } + }, + ) + + # Choose a small deadline so the 404 retries give up. + retry = ( + google.cloud.bigquery.retry._DEFAULT_GET_JOB_CONFLICT_RETRY.with_deadline(1) + ) + job_id_patcher = mock.patch.object( + google.cloud.bigquery._job_helpers, + "make_job_id", + side_effect=[job_id_1, job_id_2], + ) + retry_patcher = mock.patch.object( + google.cloud.bigquery.retry, + "_DEFAULT_GET_JOB_CONFLICT_RETRY", + retry, + ) + + with freezegun.freeze_time( + "2025-01-01 00:00:00", + # 10x the retry deadline to guarantee a timeout. + auto_tick_seconds=10, + ), job_begin_patcher, job_id_patcher, retry_patcher: + # If get job request fails there does exist a job + # with this ID already, retry 404 until we get it (or fails for a + # non-retriable reason, see other tests). + result = client.query("SELECT 1;", job_id=None) + + jobs_get_path_1 = mock.call( + method="GET", + path=f"/projects/{self.PROJECT}/jobs/{job_id_1}", + query_params={ + "projection": "full", + }, + timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT, + ) + jobs_get_path_2 = mock.call( + method="GET", + path=f"/projects/{self.PROJECT}/jobs/{job_id_2}", + query_params={ + "projection": "full", + }, + timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT, + ) + conn.api_request.assert_has_calls( + # Double-check that it was jobs.get that was called for each of our + # mocked responses. + [jobs_get_path_1, jobs_get_path_2], + ) + assert result.job_id == job_id_2 + def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_succeeds(self): from google.api_core.exceptions import Conflict from google.cloud.bigquery.job import QueryJob