Skip to content
This repository was archived by the owner on Nov 12, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 66 additions & 220 deletions google/cloud/bigquery_storage_v1/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def __init__(
initial_request_template: gapic_types.AppendRowsRequest,
*,
metadata: Sequence[Tuple[str, str]] = (),
):
) -> None:
"""Construct a stream manager.

Args:
Expand All @@ -106,154 +106,45 @@ def __init__(
Extra headers to include when sending the streaming request.
"""
self._client = client
self._closing = threading.Lock()
self._closed = False
self._close_callbacks = []
self._futures_queue = queue.Queue()
self._metadata = metadata
self._thread_lock = threading.RLock()
self._closed_connection = {}

self._stream_name = None

# Make a deepcopy of the template and clear the proto3-only fields
self._initial_request_template = _process_request_template(
initial_request_template
)

# Only one call to `send()` should attempt to open the RPC.
self._opening = threading.Lock()

self._rpc = None
self._stream_name = None

# The threads created in ``._open()``.
self._consumer = None
self._connection = _Connection(
client=client,
writer=self,
metadata=metadata,
)

@property
def is_active(self) -> bool:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method meant for use by the user? Do we care that the they could see this transition from true -> false -> true during connection transitions?

Copy link
Copy Markdown
Contributor Author

@Linchin Linchin Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user can use it, but I'm not sure what the use case is. We still have to keep it though, because removing would be a breaking change.

Do we care that the they could see this transition from true -> false -> true during connection transitions?

We recommend in the docstring that they call this property inside a thread lock, considering we are not using multi-processing in our code, this should prevent the users from experiencing a race condition.

"""bool: True if this manager is actively streaming.
"""bool: True if this manager is actively streaming. It is recommended
to call this property inside a thread lock to avoid any race conditions.

Note that ``False`` does not indicate this is complete shut down,
just that it stopped getting new messages.
"""
return self._consumer is not None and self._consumer.is_active
return self._connection is not None and self._connection.is_active
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this access to self._connection have read lock?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhat related to #901 (comment), having the lock wouldn't prevent a race condition from happening. The user would have to call the method from inside a lock, and we have pointed it out in the docstring.


def add_close_callback(self, callback: Callable):
def add_close_callback(self, callback: Callable) -> None:
"""Schedules a callable when the manager closes.
Args:
callback (Callable): The method to call.
"""
self._close_callbacks.append(callback)

def _open(
self,
initial_request: gapic_types.AppendRowsRequest,
timeout: float = _DEFAULT_TIMEOUT,
) -> "AppendRowsFuture":
"""Open an append rows stream.

This is automatically called by the first call to the
:attr:`google.cloud.bigquery_storage_v1.writer.AppendRowsStream.send`
method.

Args:
initial_request:
The initial request to start the stream. Must have
:attr:`google.cloud.bigquery_storage_v1.types.AppendRowsRequest.write_stream`
and ``proto_rows.writer_schema.proto_descriptor`` and
properties populated.
timeout:
How long (in seconds) to wait for the stream to be ready.

Returns:
A future, which can be used to process the response to the initial
request when it arrives.
"""
if self.is_active:
raise ValueError("This manager is already open.")

if self._closed:
raise bqstorage_exceptions.StreamClosedError(
"This manager has been closed and can not be re-used."
)

start_time = time.monotonic()
request = gapic_types.AppendRowsRequest()
gapic_types.AppendRowsRequest.copy_from(request, self._initial_request_template)
request._pb.MergeFrom(initial_request._pb)
self._stream_name = request.write_stream
if initial_request.trace_id:
request.trace_id = f"python-writer:{package_version.__version__} {initial_request.trace_id}"
else:
request.trace_id = f"python-writer:{package_version.__version__}"

inital_response_future = AppendRowsFuture(self)
self._futures_queue.put(inital_response_future)

self._rpc = bidi.BidiRpc(
self._client.append_rows,
initial_request=request,
# TODO: pass in retry and timeout. Blocked by
# https://github.com/googleapis/python-api-core/issues/262
metadata=tuple(
itertools.chain(
self._metadata,
# This header is required so that the BigQuery Storage API
# knows which region to route the request to.
(("x-goog-request-params", f"write_stream={self._stream_name}"),),
)
),
)
self._rpc.add_done_callback(self._on_rpc_done)

self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
self._consumer.start()

# Make sure RPC has started before returning.
# Without this, consumers may get:
#
# ValueError: Can not send() on an RPC that has never been open()ed.
#
# when they try to send a request.
try:
while not self._rpc.is_active and self._consumer.is_active:
# Avoid 100% CPU while waiting for RPC to be ready.
time.sleep(_WRITE_OPEN_INTERVAL)

# TODO: Check retry.deadline instead of (per-request) timeout.
# Blocked by
# https://github.com/googleapis/python-api-core/issues/262
if timeout is None:
continue
current_time = time.monotonic()
if current_time - start_time > timeout:
break
except AttributeError:
# Handle the AttributeError which can occur if the stream is
# unable to be opened. In that case, self._rpc or self._consumer
# may be None.
pass

try:
is_consumer_active = self._consumer.is_active
except AttributeError:
# Handle the AttributeError which can occur if the stream is
# unable to be opened. In that case, self._consumer
# may be None.
is_consumer_active = False

# Something went wrong when opening the RPC.
if not is_consumer_active:
# TODO: Share the exception from _rpc.open(). Blocked by
# https://github.com/googleapis/python-api-core/issues/268
request_exception = exceptions.Unknown(
"There was a problem opening the stream. "
"Try turning on DEBUG level logs to see the error."
)
self.close(reason=request_exception)
raise request_exception

return inital_response_future

def send(self, request: gapic_types.AppendRowsRequest) -> "AppendRowsFuture":
"""Send an append rows request to the open stream.
def send(self, request: gapic_types.AppendRowsRequest) -> AppendRowsFuture:
"""Send an append rows request to the open stream. The name of the
stream is extracted from the first request and cannot be changed.

Args:
request:
Expand All @@ -263,50 +154,17 @@ def send(self, request: gapic_types.AppendRowsRequest) -> "AppendRowsFuture":
A future, which can be used to process the response when it
arrives.
"""
if self._closed:
raise bqstorage_exceptions.StreamClosedError(
"This manager has been closed and can not be used."
)

# If the manager hasn't been openned yet, automatically open it. Only
# one call to `send()` should attempt to open the RPC. After `_open()`,
# the stream is active, unless something went wrong with the first call
# to open, in which case this send will fail anyway due to a closed
# RPC.
with self._opening:
if not self.is_active:
return self._open(request)

# For each request, we expect exactly one response (in order). Add a
# future to the queue so that when the response comes, the callback can
# pull it off and notify completion.
future = AppendRowsFuture(self)
self._futures_queue.put(future)
self._rpc.send(request)
return future

def _on_response(self, response: gapic_types.AppendRowsResponse):
"""Process a response from a consumer callback."""
# If the stream has closed, but somehow we still got a response message
# back, discard it. The response futures queue has been drained, with
# an exception reported.
if self._closed:
raise bqstorage_exceptions.StreamClosedError(
f"Stream closed before receiving response: {response}"
if self._stream_name is None:
self._stream_name = request.write_stream
elif request.write_stream != self._stream_name:
raise ValueError(
"Stream name is already set by the original request as "
f"{self._stream_name}, different from {request.write_stream} "
"in this request. Please use the same name or open a new stream."
)
return self._connection.send(request)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read lock needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in our case it would be fine, because the concurrency library that we use (threading) supports only one CPU.


# Since we have 1 response per request, if we get here from a response
# callback, the queue should never be empty.
future: AppendRowsFuture = self._futures_queue.get_nowait()
if response.error.code:
exc = exceptions.from_grpc_status(
response.error.code, response.error.message, response=response
)
future.set_exception(exc)
else:
future.set_result(response)

def close(self, reason: Optional[Exception] = None):
def close(self, reason: Optional[Exception] = None) -> None:
"""Stop consuming messages and shutdown all helper threads.

This method is idempotent. Additional calls will have no effect.
Expand All @@ -316,64 +174,51 @@ def close(self, reason: Optional[Exception] = None):
an "intentional" shutdown. This is passed to the callbacks
specified via :meth:`add_close_callback`.
"""
self._shutdown(reason=reason)

def _shutdown(self, reason: Optional[Exception] = None):
"""Run the actual shutdown sequence (stop the stream and all helper threads).

Args:
reason:
The reason to close the stream. If ``None``, this is
considered an "intentional" shutdown.
"""
with self._closing:
if self._closed:
return

# Stop consuming messages.
with self._thread_lock:
if self.is_active:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably just stylistic, but do you want to unconditionally call the Connection's close and have it handle all the internal state checks? is_active is more of less just peeking into the Connection state indirectly.

You could also consider not raising the duplicate close error if the method is really idempotent, but it might require refinements to the Connection implementations you tackle in a followup PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably just stylistic, but do you want to unconditionally call the Connection's close and have it handle all the internal state checks? is_active is more of less just peeking into the Connection state indirectly.

Yeah, I think having AppendRowsStream more decoupled from _Connection will make the code easier to maintain or update in the future.

You could also consider not raising the duplicate close error if the method is really idempotent, but it might require refinements to the Connection implementations you tackle in a followup PR.

It would be a great idea to ensure Connection's close method is idempotent. I can remove the check once this step is done.

_LOGGER.debug("Stopping consumer.")
self._consumer.stop()
self._consumer = None

if self._rpc is not None:
self._rpc.close()
self._closed = True
_LOGGER.debug("Finished stopping manager.")

# We know that no new items will be added to the queue because
# we've marked the stream as closed.
while not self._futures_queue.empty():
# Mark each future as failed. Since the consumer thread has
# stopped (or at least is attempting to stop), we won't get
# response callbacks to populate the remaining futures.
future = self._futures_queue.get_nowait()
if reason is None:
exc = bqstorage_exceptions.StreamClosedError(
"Stream closed before receiving a response."
)
else:
exc = reason
future.set_exception(exc)
self._connection.close(reason=reason)
else:
raise bqstorage_exceptions.StreamClosedError(
"Cannot close again when the connection is already closed."
)
for callback in self._close_callbacks:
callback(self, reason)

for callback in self._close_callbacks:
callback(self, reason)
self._closed = True

def _on_rpc_done(self, future):
"""Triggered whenever the underlying RPC terminates without recovery.
def _renew_connection(self, reason: Optional[Exception] = None) -> None:
"""Helper function that is called when the RPC connection is closed
without recovery. It first creates a new Connection instance in an
atomic manner, and then cleans up the failed connection. Note that a
new RPC connection is not established by instantiating _Connection,
but only when `send()` is called for the first time.
"""
# Creates a new Connection instance, but doesn't establish a new RPC
# connection. New connection is only started when `send()` is called
# again, in order to save resource if the stream is idle. This action
# is atomic.
with self._thread_lock:
_closed_connection = self._connection
self._connection = _Connection(
client=self._client,
writer=self,
metadata=self._metadata,
)

This is typically triggered from one of two threads: the background
consumer thread (when calling ``recv()`` produces a non-recoverable
error) or the grpc management thread (when cancelling the RPC).
# Cleanup, and marks futures as failed. To minimize the length of the
# critical section, this step is not guaranteed to be atomic.
_closed_connection._shutdown(reason=reason)

This method is *non-blocking*. It will start another thread to deal
with shutting everything down. This is to prevent blocking in the
background consumer and preventing it from being ``joined()``.
def _on_rpc_done(self, reason: Optional[Exception] = None) -> None:
"""Callback passecd to _Connection. It's called when the RPC connection
is closed without recovery. Spins up a new thread to call the helper
function `_renew_connection()`, which creates a new connection and
cleans up the current one.
"""
_LOGGER.info("RPC termination has signaled streaming pull manager shutdown.")
error = _wrap_as_exception(future)
thread = threading.Thread(
name=_RPC_ERROR_THREAD_NAME, target=self._shutdown, kwargs={"reason": error}
name=_RPC_ERROR_THREAD_NAME,
target=self._renew_connection,
kwargs={"reason": reason},
)
thread.daemon = True
thread.start()
Expand Down Expand Up @@ -661,7 +506,8 @@ def _on_rpc_done(self, future: AppendRowsFuture) -> None:
Calls the callback from AppendRowsStream to handle the cleanup and
possible retries.
"""
self._writer._on_rpc_done(future)
error = _wrap_as_exception(future)
self._writer._on_rpc_done(reason=error)


class AppendRowsFuture(polling_future.PollingFuture):
Expand Down
13 changes: 9 additions & 4 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,25 @@ def local_shakespeare_table_reference(project_id, use_mtls):
return _TABLE_FORMAT.format(project_id, "public_samples_copy", "shakespeare")


@pytest.fixture(scope="session")
def dataset(project_id, bq_client):
def _make_dataset(project_id, bq_client, location):
from google.cloud import bigquery

dataset_name = prefixer.create_prefix()

dataset_id = "{}.{}".format(project_id, dataset_name)
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset.location = location
created_dataset = bq_client.create_dataset(dataset)

return created_dataset


@pytest.fixture(scope="session")
def dataset(project_id, bq_client):
created_dataset = _make_dataset(project_id, bq_client, location="US")
yield created_dataset

bq_client.delete_dataset(dataset, delete_contents=True)
bq_client.delete_dataset(created_dataset, delete_contents=True)


@pytest.fixture(scope="session")
Expand Down
Loading