Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,21 @@ public void insertAll(TableReference ref, List<TableRow> rowList) throws IOExcep
public void insertAll(TableReference ref, List<TableRow> rowList,
@Nullable List<String> insertIdList, Aggregator<Long, Long> byteCountAggregator)
throws IOException {
insertAll(
ref, rowList, insertIdList, byteCountAggregator, INSERT_BACKOFF_FACTORY.backoff(),
Sleeper.DEFAULT);
}

@VisibleForTesting
void insertAll(TableReference ref, List<TableRow> rowList,
@Nullable List<String> insertIdList, Aggregator<Long, Long> byteCountAggregator,
BackOff backoff, final Sleeper sleeper) throws IOException {
checkNotNull(ref, "ref");
if (insertIdList != null && rowList.size() != insertIdList.size()) {
throw new AssertionError("If insertIdList is not null it needs to have at least "
+ "as many elements as rowList");
}

BackOff backoff = INSERT_BACKOFF_FACTORY.backoff();

List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
// These lists contain the rows to publish. Initially the contain the entire list. If there are
// failures, they will contain only the failed rows to be retried.
Expand Down Expand Up @@ -229,7 +236,7 @@ public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
if (new ApiErrorExtractor().rateLimited(e)) {
LOG.info("BigQuery insertAll exceeded rate limit, retrying");
try {
Thread.sleep(backoff.nextBackOffMillis());
sleeper.sleep(backoff.nextBackOffMillis());
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
throw new IOException(
Expand Down Expand Up @@ -284,18 +291,20 @@ public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {

long nextBackoffMillis = backoff.nextBackOffMillis();
if (nextBackoffMillis == BackOff.STOP) {
try {
Thread.sleep(backoff.nextBackOffMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting before retrying insert of " + retryRows);
}
LOG.info("Retrying failed inserts to BigQuery");
rowsToPublish = retryRows;
idsToPublish = retryIds;
allErrors.clear();
break;
}
try {
sleeper.sleep(nextBackoffMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting before retrying insert of " + retryRows);
}
rowsToPublish = retryRows;
idsToPublish = retryIds;
allErrors.clear();
LOG.info("Retrying {} failed inserts to BigQuery", rowsToPublish.size());
}

if (!allErrors.isEmpty()) {
throw new IOException("Insert failed: " + allErrors);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package com.google.cloud.dataflow.sdk.util;

import static com.google.common.base.Verify.verifyNotNull;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.times;
Expand All @@ -36,11 +39,13 @@
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.client.testing.http.MockHttpTransport;
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
import com.google.api.client.testing.util.MockSleeper;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
Expand All @@ -56,6 +61,8 @@
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -260,13 +267,92 @@ public void testInsertRetry() throws IOException {

BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);

inserter.insertAll(ref, rows);
inserter.insertAll(ref, rows, null, null, TEST_BACKOFF.backoff(), new MockSleeper());
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
verify(response, times(2)).getContentType();
expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, retrying");
}

/**
* Tests that {@link BigQueryTableInserter#insertAll} retries selected rows on failure.
*/
@Test
public void testInsertRetrySelectRows() throws Exception {
TableReference ref =
new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
List<TableRow> rows = ImmutableList.of(
new TableRow().set("row", "a"), new TableRow().set("row", "b"));
List<String> insertIds = ImmutableList.of("a", "b");

final TableDataInsertAllResponse bFailed = new TableDataInsertAllResponse()
.setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(1L)));

final TableDataInsertAllResponse allRowsSucceeded = new TableDataInsertAllResponse();

when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(200).thenReturn(200);
when(response.getContent())
.thenReturn(toStream(bFailed)).thenReturn(toStream(allRowsSucceeded));

BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
inserter.insertAll(ref, rows, insertIds, null, TEST_BACKOFF.backoff(), new MockSleeper());
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
verify(response, times(2)).getContentType();
expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
}

// A BackOff that makes a total of 4 attempts
private static final FluentBackoff TEST_BACKOFF = FluentBackoff.DEFAULT.withMaxRetries(3);

/**
* Tests that {@link BigQueryTableInserter#insertAll} fails gracefully when persistent issues.
*/
@Test
public void testInsertFailsGracefully() throws Exception {
TableReference ref =
new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
List<TableRow> rows = ImmutableList.of(new TableRow(), new TableRow());

final TableDataInsertAllResponse row1Failed = new TableDataInsertAllResponse()
.setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(1L)));

final TableDataInsertAllResponse row0Failed = new TableDataInsertAllResponse()
.setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(0L)));

when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
// Always return 200.
when(response.getStatusCode()).thenReturn(200);
// Return row 1 failing, then we retry row 1 as row 0, and row 0 persistently fails.
when(response.getContent())
.thenReturn(toStream(row1Failed))
.thenAnswer(new Answer<InputStream>() {
@Override
public InputStream answer(InvocationOnMock invocation) throws Throwable {
return toStream(row0Failed);
}
});

BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);

// Expect it to fail.
try {
inserter.insertAll(ref, rows, null, null, TEST_BACKOFF.backoff(), new MockSleeper());
fail();
} catch (IOException e) {
assertThat(e, instanceOf(IOException.class));
assertThat(e.getMessage(), containsString("Insert failed:"));
assertThat(e.getMessage(), containsString("[{\"index\":0}]"));
}

// Verify the exact number of retries as well as log messages.
verify(response, times(4)).getStatusCode();
verify(response, times(4)).getContent();
verify(response, times(4)).getContentType();
expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
}

/**
* Tests that {@link BigQueryTableInserter#insertAll} does not retry non-rate-limited attempts.
*/
Expand All @@ -291,7 +377,7 @@ public void testInsertDoesNotRetry() throws Throwable {
BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);

try {
inserter.insertAll(ref, rows);
inserter.insertAll(ref, rows, null, null, TEST_BACKOFF.backoff(), new MockSleeper());
fail();
} catch (RuntimeException e) {
verify(response, times(1)).getStatusCode();
Expand Down