Skip to content
This repository was archived by the owner on Nov 12, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion google/cloud/bigquery_storage_v1/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def _on_response(self, response: gapic_types.AppendRowsResponse):
future: AppendRowsFuture = self._futures_queue.get_nowait()
if response.error.code:
exc = exceptions.from_grpc_status(
response.error.code, response.error.message
response.error.code, response.error.message, response=response
)
future.set_exception(exc)
else:
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/bigquery_storage_v1beta2/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def _on_response(self, response: gapic_types.AppendRowsResponse):
future: AppendRowsFuture = self._futures_queue.get_nowait()
if response.error.code:
exc = exceptions.from_grpc_status(
response.error.code, response.error.message
response.error.code, response.error.message, response=response
)
future.set_exception(exc)
else:
Expand Down
50 changes: 39 additions & 11 deletions tests/system/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def table(project_id, dataset, bq_client):
schema = [
bigquery.SchemaField("first_name", "STRING", mode="NULLABLE"),
bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"),
bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
]

unique_suffix = str(uuid.uuid4()).replace("-", "_")
Expand All @@ -52,15 +52,8 @@ def bqstorage_write_client(credentials):
return bigquery_storage_v1.BigQueryWriteClient(credentials=credentials)


def test_append_rows_with_invalid_stream_name_fails_fast(bqstorage_write_client):
bad_request = gapic_types.AppendRowsRequest()
bad_request.write_stream = "this-is-an-invalid-stream-resource-path"

with pytest.raises(exceptions.GoogleAPICallError):
bqstorage_write_client.append_rows(bad_request)


def test_append_rows_with_proto3(bqstorage_write_client, table):
@pytest.fixture(scope="function")
def append_rows_stream(bqstorage_write_client, table):
person_pb = person_pb2.PersonProto()

stream_name = f"projects/{table.project}/datasets/{table.dataset_id}/tables/{table.table_id}/_default"
Expand All @@ -81,11 +74,22 @@ def test_append_rows_with_proto3(bqstorage_write_client, table):
bqstorage_write_client,
request_template,
)
return append_rows_stream


def test_append_rows_with_invalid_stream_name_fails_fast(bqstorage_write_client):
bad_request = gapic_types.AppendRowsRequest()
bad_request.write_stream = "this-is-an-invalid-stream-resource-path"

with pytest.raises(exceptions.GoogleAPICallError):
bqstorage_write_client.append_rows(bad_request)


def test_append_rows_with_proto3(append_rows_stream):
request = gapic_types.AppendRowsRequest()
proto_data = gapic_types.AppendRowsRequest.ProtoData()
proto_rows = gapic_types.ProtoRows()
row = person_pb
row = person_pb2.PersonProto()
row.first_name = "fn"
row.last_name = "ln"
row.age = 20
Expand All @@ -96,3 +100,27 @@ def test_append_rows_with_proto3(bqstorage_write_client, table):

assert response_future.result()
# The request should success


def test_append_rows_with_proto3_got_response_on_failure(append_rows_stream):
"""When the request fails and there is a response, verify that the response
is included in the exception. For more details, see
https://github.com/googleapis/python-bigquery-storage/issues/836
"""

# Make an invalid request by leaving the required field row.age blank.
request = gapic_types.AppendRowsRequest()
proto_data = gapic_types.AppendRowsRequest.ProtoData()
proto_rows = gapic_types.ProtoRows()
row = person_pb2.PersonProto()
row.first_name = "fn"
row.last_name = "ln"
proto_rows.serialized_rows.append(row.SerializeToString())
proto_data.rows = proto_rows
request.proto_rows = proto_data
response_future = append_rows_stream.send(request)

with pytest.raises(exceptions.GoogleAPICallError) as excinfo:
response_future.result()

assert isinstance(excinfo.value.response, gapic_types.AppendRowsResponse)