diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java index dab2073c95..a55823ec55 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java @@ -172,14 +172,21 @@ public void insertAll(TableReference ref, List rowList) throws IOExcep public void insertAll(TableReference ref, List rowList, @Nullable List insertIdList, Aggregator byteCountAggregator) throws IOException { + insertAll( + ref, rowList, insertIdList, byteCountAggregator, INSERT_BACKOFF_FACTORY.backoff(), + Sleeper.DEFAULT); + } + + @VisibleForTesting + void insertAll(TableReference ref, List rowList, + @Nullable List insertIdList, Aggregator byteCountAggregator, + BackOff backoff, final Sleeper sleeper) throws IOException { checkNotNull(ref, "ref"); if (insertIdList != null && rowList.size() != insertIdList.size()) { throw new AssertionError("If insertIdList is not null it needs to have at least " + "as many elements as rowList"); } - BackOff backoff = INSERT_BACKOFF_FACTORY.backoff(); - List allErrors = new ArrayList<>(); // These lists contain the rows to publish. Initially the contain the entire list. If there are // failures, they will contain only the failed rows to be retried. @@ -229,7 +236,7 @@ public List call() throws IOException { if (new ApiErrorExtractor().rateLimited(e)) { LOG.info("BigQuery insertAll exceeded rate limit, retrying"); try { - Thread.sleep(backoff.nextBackOffMillis()); + sleeper.sleep(backoff.nextBackOffMillis()); } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); throw new IOException( @@ -284,18 +291,20 @@ public List call() throws IOException { long nextBackoffMillis = backoff.nextBackOffMillis(); if (nextBackoffMillis == BackOff.STOP) { - try { - Thread.sleep(backoff.nextBackOffMillis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while waiting before retrying insert of " + retryRows); - } - LOG.info("Retrying failed inserts to BigQuery"); - rowsToPublish = retryRows; - idsToPublish = retryIds; - allErrors.clear(); + break; } + try { + sleeper.sleep(nextBackoffMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting before retrying insert of " + retryRows); + } + rowsToPublish = retryRows; + idsToPublish = retryIds; + allErrors.clear(); + LOG.info("Retrying {} failed inserts to BigQuery", rowsToPublish.size()); } + if (!allErrors.isEmpty()) { throw new IOException("Insert failed: " + allErrors); } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java index 86448f6ed4..102bd17f4b 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java @@ -17,8 +17,11 @@ package com.google.cloud.dataflow.sdk.util; import static com.google.common.base.Verify.verifyNotNull; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.times; @@ -36,11 +39,13 @@ import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.client.testing.http.MockHttpTransport; import com.google.api.client.testing.http.MockLowLevelHttpRequest; +import com.google.api.client.testing.util.MockSleeper; import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableDataInsertAllResponse; +import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; @@ -56,6 +61,8 @@ import org.junit.runners.JUnit4; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -260,13 +267,92 @@ public void testInsertRetry() throws IOException { BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); - inserter.insertAll(ref, rows); + inserter.insertAll(ref, rows, null, null, TEST_BACKOFF.backoff(), new MockSleeper()); verify(response, times(2)).getStatusCode(); verify(response, times(2)).getContent(); verify(response, times(2)).getContentType(); expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, retrying"); } + /** + * Tests that {@link BigQueryTableInserter#insertAll} retries selected rows on failure. + */ + @Test + public void testInsertRetrySelectRows() throws Exception { + TableReference ref = + new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); + List rows = ImmutableList.of( + new TableRow().set("row", "a"), new TableRow().set("row", "b")); + List insertIds = ImmutableList.of("a", "b"); + + final TableDataInsertAllResponse bFailed = new TableDataInsertAllResponse() + .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(1L))); + + final TableDataInsertAllResponse allRowsSucceeded = new TableDataInsertAllResponse(); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200).thenReturn(200); + when(response.getContent()) + .thenReturn(toStream(bFailed)).thenReturn(toStream(allRowsSucceeded)); + + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + inserter.insertAll(ref, rows, insertIds, null, TEST_BACKOFF.backoff(), new MockSleeper()); + verify(response, times(2)).getStatusCode(); + verify(response, times(2)).getContent(); + verify(response, times(2)).getContentType(); + expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery"); + } + + // A BackOff that makes a total of 4 attempts + private static final FluentBackoff TEST_BACKOFF = FluentBackoff.DEFAULT.withMaxRetries(3); + + /** + * Tests that {@link BigQueryTableInserter#insertAll} fails gracefully when persistent issues. + */ + @Test + public void testInsertFailsGracefully() throws Exception { + TableReference ref = + new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); + List rows = ImmutableList.of(new TableRow(), new TableRow()); + + final TableDataInsertAllResponse row1Failed = new TableDataInsertAllResponse() + .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(1L))); + + final TableDataInsertAllResponse row0Failed = new TableDataInsertAllResponse() + .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(0L))); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + // Always return 200. + when(response.getStatusCode()).thenReturn(200); + // Return row 1 failing, then we retry row 1 as row 0, and row 0 persistently fails. + when(response.getContent()) + .thenReturn(toStream(row1Failed)) + .thenAnswer(new Answer() { + @Override + public InputStream answer(InvocationOnMock invocation) throws Throwable { + return toStream(row0Failed); + } + }); + + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + + // Expect it to fail. + try { + inserter.insertAll(ref, rows, null, null, TEST_BACKOFF.backoff(), new MockSleeper()); + fail(); + } catch (IOException e) { + assertThat(e, instanceOf(IOException.class)); + assertThat(e.getMessage(), containsString("Insert failed:")); + assertThat(e.getMessage(), containsString("[{\"index\":0}]")); + } + + // Verify the exact number of retries as well as log messages. + verify(response, times(4)).getStatusCode(); + verify(response, times(4)).getContent(); + verify(response, times(4)).getContentType(); + expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery"); + } + /** * Tests that {@link BigQueryTableInserter#insertAll} does not retry non-rate-limited attempts. */ @@ -291,7 +377,7 @@ public void testInsertDoesNotRetry() throws Throwable { BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); try { - inserter.insertAll(ref, rows); + inserter.insertAll(ref, rows, null, null, TEST_BACKOFF.backoff(), new MockSleeper()); fail(); } catch (RuntimeException e) { verify(response, times(1)).getStatusCode();