diff --git a/google/cloud/pubsub_v1/opentelemetry/__init__.py b/google/cloud/pubsub_v1/opentelemetry/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/google/cloud/pubsub_v1/opentelemetry/context_propagation.py b/google/cloud/pubsub_v1/opentelemetry/context_propagation.py new file mode 100644 index 000000000..fd22a2d38 --- /dev/null +++ b/google/cloud/pubsub_v1/opentelemetry/context_propagation.py @@ -0,0 +1,15 @@ +from opentelemetry.propagators.textmap import Getter +from google.pubsub_v1 import types as gapic_types +import typing + + +class OTelContextGetter(Getter): + """Used to extract Open Telemetry Context from message attributes.""" + + def get( + self, carrier: gapic_types.PubsubMessage, key: str + ) -> typing.Optional[typing.List[str]]: + return [carrier.attributes["googclient_" + key]] + + def keys(self, carrier: gapic_types.PubsubMessage) -> typing.List[str]: + return list(carrier.attributes.keys()) diff --git a/google/cloud/pubsub_v1/opentelemetry/publish_message_wrapper.py b/google/cloud/pubsub_v1/opentelemetry/publish_message_wrapper.py new file mode 100644 index 000000000..db3c04cca --- /dev/null +++ b/google/cloud/pubsub_v1/opentelemetry/publish_message_wrapper.py @@ -0,0 +1,16 @@ +from google.pubsub_v1 import types as gapic_types +from opentelemetry import trace +from dataclasses import dataclass, field + +from typing import Optional + + +@dataclass +class PublishMessageWrapper: + """ + Wraps Pub/Sub message with additional metadata required for + Open Telemetry tracing. + """ + + message: gapic_types.PubsubMessage + span: Optional[trace.Span] = field(default=None) diff --git a/google/cloud/pubsub_v1/opentelemetry/subscribe_spans_data.py b/google/cloud/pubsub_v1/opentelemetry/subscribe_spans_data.py new file mode 100644 index 000000000..67f0af3d9 --- /dev/null +++ b/google/cloud/pubsub_v1/opentelemetry/subscribe_spans_data.py @@ -0,0 +1,22 @@ +from dataclasses import dataclass +from typing import Optional + +from opentelemetry.trace.span import Span + + +@dataclass +class OpenTelemetryData: + """ + This class is for internal use by the library only. + Contains Open Telmetry data associated with a + google.cloud.pubsub_v1.subscriber.message.Message. Specifically it contains + the subscriber side spans associated with the message. + + This is so that the subsriber side spans can be ended by the library + after receiving the message back via an ack(), ack_with_response(), nack(). + """ + + subscribe_span: Optional[Span] = None + concurrency_control_span: Optional[Span] = None + scheduler_span: Optional[Span] = None + process_span: Optional[Span] = None diff --git a/google/cloud/pubsub_v1/publisher/_batch/base.py b/google/cloud/pubsub_v1/publisher/_batch/base.py index 52505996b..56585b416 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/base.py +++ b/google/cloud/pubsub_v1/publisher/_batch/base.py @@ -24,6 +24,9 @@ from google.cloud import pubsub_v1 from google.cloud.pubsub_v1 import types from google.pubsub_v1 import types as gapic_types + from google.cloud.pubsub_v1.opentelemetry.publish_message_wrapper import ( + PublishMessageWrapper, + ) class Batch(metaclass=abc.ABCMeta): @@ -54,7 +57,7 @@ class Batch(metaclass=abc.ABCMeta): def __len__(self): """Return the number of messages currently in the batch.""" - return len(self.messages) + return len(self.message_wrappers) @staticmethod @abc.abstractmethod @@ -68,7 +71,7 @@ def make_lock(): # pragma: NO COVER @property @abc.abstractmethod - def messages(self) -> Sequence["gapic_types.PubsubMessage"]: # pragma: NO COVER + def message_wrappers(self) -> Sequence["PublishMessageWrapper"]: # pragma: NO COVER """Return the messages currently in the batch. Returns: diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index 1617f8c90..178b2a293 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -17,13 +17,19 @@ import logging import threading import time +from datetime import datetime import typing from typing import Any, Callable, List, Optional, Sequence +from opentelemetry import trace + import google.api_core.exceptions from google.api_core import gapic_v1 from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher import futures +from google.cloud.pubsub_v1.opentelemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) from google.cloud.pubsub_v1.publisher._batch import base from google.pubsub_v1 import types as gapic_types @@ -108,7 +114,7 @@ def __init__( # status changed from ACCEPTING_MESSAGES to any other # in order to avoid race conditions self._futures: List[futures.Future] = [] - self._messages: List[gapic_types.PubsubMessage] = [] + self._message_wrappers: List[PublishMessageWrapper] = [] self._status = base.BatchStatus.ACCEPTING_MESSAGES # The initial size is not zero, we need to account for the size overhead @@ -134,9 +140,9 @@ def client(self) -> "PublisherClient": return self._client @property - def messages(self) -> Sequence[gapic_types.PubsubMessage]: + def message_wrappers(self) -> Sequence[PublishMessageWrapper]: """The messages currently in the batch.""" - return self._messages + return self._message_wrappers @property def settings(self) -> "types.BatchSettings": @@ -259,7 +265,7 @@ def _commit(self) -> None: # https://github.com/googleapis/google-cloud-python/issues/8036 # Sanity check: If there are no messages, no-op. - if not self._messages: + if not self._message_wrappers: _LOGGER.debug("No messages to publish, exiting commit") self._status = base.BatchStatus.SUCCESS return @@ -270,18 +276,76 @@ def _commit(self) -> None: batch_transport_succeeded = True try: + if self._client._open_telemetry_enabled: + tracer = trace.get_tracer("com.google.cloud.pubsub.v1") + links = [] + for wrapper in self._message_wrappers: + span = wrapper.span + if span and span.is_recording(): + links.append(trace.Link(span.get_span_context())) + with tracer.start_as_current_span( + name=f"{self._topic} publish", + attributes={ + "messaging.system": "com.google.cloud.pubsub.v1", + "messaging.destination.name": self._topic, + "gcp.project_id": self._topic.split("/")[1], + "messaging.batch.message_count": len(self._message_wrappers), + "messaging.operation": "publish", + "code.function": "_commit", + }, + links=links if len(links) > 0 else None, + kind=trace.SpanKind.CLIENT, + end_on_exit=False, + ) as publish_rpc_span: + ctx = publish_rpc_span.get_span_context() + for wrapper in self._message_wrappers: + span = wrapper.span + if span and span.is_recording(): + span.add_link(ctx) # Performs retries for errors defined by the retry configuration. response = self._client._gapic_publish( topic=self._topic, - messages=self._messages, + messages=[wrapper.message for wrapper in self._message_wrappers], retry=self._commit_retry, timeout=self._commit_timeout, ) + if self._client._open_telemetry_enabled: + publish_rpc_span.end() + for message_id, wrapper in zip( + response.message_ids, self._message_wrappers + ): + span = wrapper.span + if span: + span.add_event( + name="publish end", + attributes={ + "timestamp": str(datetime.now()), + }, + ) + span.set_attribute(key="messaging.message.id", value=message_id) + span.end() except google.api_core.exceptions.GoogleAPIError as exc: # We failed to publish, even after retries, so set the exception on # all futures and exit. self._status = base.BatchStatus.ERROR + if self._client._open_telemetry_enabled: + publish_rpc_span.record_exception( + exception=exc, + ) + publish_rpc_span.set_status( + trace.Status(status_code=trace.StatusCode.ERROR) + ) + publish_rpc_span.end() + for wrapper in self._message_wrappers: + span = wrapper.span + if span: + span.record_exception(exception=exc) + span.set_status( + trace.Status(status_code=trace.StatusCode.ERROR), + ) + span.end() + batch_transport_succeeded = False if self._batch_done_callback is not None: # Failed to publish batch. @@ -326,7 +390,8 @@ def _commit(self) -> None: self._batch_done_callback(batch_transport_succeeded) def publish( - self, message: gapic_types.PubsubMessage + self, + message_wrapper: PublishMessageWrapper, ) -> Optional["pubsub_v1.publisher.futures.Future"]: """Publish a single message. @@ -351,12 +416,16 @@ def publish( """ # Coerce the type, just in case. - if not isinstance(message, gapic_types.PubsubMessage): + if not isinstance( + message_wrapper.message, gapic_types.PubsubMessage + ): # pragma: NO COVER # For performance reasons, the message should be constructed by directly # using the raw protobuf class, and only then wrapping it into the # higher-level PubsubMessage class. - vanilla_pb = _raw_proto_pubbsub_message(**message) - message = gapic_types.PubsubMessage.wrap(vanilla_pb) + vanilla_pb = _raw_proto_pubbsub_message(**message_wrapper.message) + message_wrapper.message = vanilla_pb.gapic_types.PubsubMessage.wrap( + vanilla_pb + ) future = None @@ -369,7 +438,7 @@ def publish( return None size_increase = gapic_types.PublishRequest( - messages=[message] + messages=[message_wrapper.message] )._pb.ByteSize() if (self._base_request_size + size_increase) > _SERVER_PUBLISH_MAX_BYTES: @@ -381,16 +450,15 @@ def publish( raise exceptions.MessageTooLargeError(err_msg) new_size = self._size + size_increase - new_count = len(self._messages) + 1 + new_count = len(self._message_wrappers) + 1 size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES) overflow = new_size > size_limit or new_count >= self.settings.max_messages - if not self._messages or not overflow: + if not self._message_wrappers or not overflow: # Store the actual message in the batch's message queue. - self._messages.append(message) + self._message_wrappers.append(message_wrapper) self._size = new_size - # Track the future on this batch (so that the result of the # future can be set). future = futures.Future() diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py index 30c76a44f..c623c2cef 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py @@ -23,7 +23,9 @@ from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher._sequencer import base as sequencer_base from google.cloud.pubsub_v1.publisher._batch import base as batch_base -from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.opentelemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud.pubsub_v1 import types @@ -262,7 +264,7 @@ def _create_batch( def publish( self, - message: gapic_types.PubsubMessage, + message: PublishMessageWrapper, retry: "OptionalRetry" = gapic_v1.method.DEFAULT, timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, ) -> futures.Future: diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py index 7d57aa821..1bf143b22 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py @@ -18,7 +18,10 @@ from google.api_core import gapic_v1 from google.cloud.pubsub_v1.publisher._sequencer import base -from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.opentelemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) + if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud.pubsub_v1.publisher import _batch @@ -115,7 +118,7 @@ def _create_batch( def publish( self, - message: gapic_types.PubsubMessage, + message: PublishMessageWrapper, retry: "OptionalRetry" = gapic_v1.method.DEFAULT, timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, ) -> "futures.Future": diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 54b353276..bd8450a27 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -22,6 +22,12 @@ import typing from typing import Any, Dict, Optional, Sequence, Tuple, Type, Union import warnings +import sys +from datetime import datetime +from opentelemetry import trace +from opentelemetry.trace.propagation import set_span_in_context +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.propagators.textmap import Setter from google.api_core import gapic_v1 from google.auth.credentials import AnonymousCredentials # type: ignore @@ -34,10 +40,14 @@ from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer from google.cloud.pubsub_v1.publisher.flow_controller import FlowController +from google.cloud.pubsub_v1.opentelemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) from google.pubsub_v1 import gapic_version as package_version from google.pubsub_v1 import types as gapic_types from google.pubsub_v1.services.publisher import client as publisher_client + __version__ = package_version.__version__ if typing.TYPE_CHECKING: # pragma: NO COVER @@ -56,6 +66,10 @@ ordered_sequencer.OrderedSequencer, unordered_sequencer.UnorderedSequencer ] +_OPEN_TELEMETRY_TRACER_NAME = "google.cloud.pubsub_v1.publisher" +_OPEN_TELEMETRY_MESSAGING_SYSTEM = "gcp_pubsub" +_OPEN_TELEMETRY_PUBLISHER_BATCHING = "publisher batching" + class Client(publisher_client.PublisherClient): """A publisher client for Google Cloud Pub/Sub. @@ -153,6 +167,24 @@ def __init__( # The object controlling the message publishing flow self._flow_controller = FlowController(self.publisher_options.flow_control) + # OpenTelemetry features used by the library are not supported in Python versions <= 3.7. + # Refer https://github.com/open-telemetry/opentelemetry-python/issues/3993#issuecomment-2211976389 + if ( + self.publisher_options.enable_open_telemetry_tracing + and sys.version_info.major == 3 + and sys.version_info.minor < 8 + ): + warnings.warn( + message="OpenTelemetry for Python version 3.7 or lower is not supported. Disabling open telemetry tracing.", + category=RuntimeWarning, + ) + self._open_telemetry_enabled = False + else: + # Option indicating whether open telemetry is enabled or not. + self._open_telemetry_enabled = ( + self.publisher_options.enable_open_telemetry_tracing + ) + @classmethod def from_service_account_file( # type: ignore[override] cls, @@ -335,6 +367,7 @@ def publish( # type: ignore[override] pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing the ``message`` would exceed the max size limit on the backend. """ + # Sanity check: Is the data being sent as a bytestring? # If it is literally anything else, complain loudly about it. if not isinstance(data, bytes): @@ -368,13 +401,76 @@ def publish( # type: ignore[override] ) message = gapic_types.PubsubMessage.wrap(vanilla_pb) + class OTelContextSetter(Setter): + """ + Used by Open Telemetry for context propagation. + """ + + def set(self, carrier: gapic_types.PubsubMessage, key: str, value: str): + """ + Injects trace context into Pub/Sub message attributes with + "googclient_" prefix. + """ + carrier.attributes["googclient_" + key] = value + + if self._open_telemetry_enabled: + tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) + with tracer.start_as_current_span( + name=f"{topic} create", + attributes={ + "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, + "messaging.destination.name": topic, + "code.function": "google.cloud.pubsub.PublisherClient.publish", + "messaging.gcp_pubsub.message.ordering_key": ordering_key, + "messaging.operation": "create", + "gcp.project_id": topic.split("/")[1], + "messaging.message.body.size": sys.getsizeof(message.data), + }, + kind=trace.SpanKind.PRODUCER, + end_on_exit=False, + ) as publish_create_span: + publish_create_span.add_event( + name="publish start", + attributes={ + "timestamp": str(datetime.now()), + }, + ) + TraceContextTextMapPropagator().inject( + carrier=message, + setter=OTelContextSetter(), + ) + # Messages should go through flow control to prevent excessive # queuing on the client side (depending on the settings). try: + if self._open_telemetry_enabled: + with tracer.start_as_current_span( + name="publisher flow control", + kind=trace.SpanKind.INTERNAL, + context=set_span_in_context(publish_create_span), + end_on_exit=False, + ) as publish_flow_control_span: + pass self._flow_controller.add(message) + if self._open_telemetry_enabled: + publish_flow_control_span.end() except exceptions.FlowControlLimitError as exc: future = futures.Future() future.set_exception(exc) + if self._open_telemetry_enabled: + publish_flow_control_span.record_exception( + exception=exc, + ) + publish_flow_control_span.set_status( + trace.Status(status_code=trace.StatusCode.ERROR) + ) + publish_flow_control_span.end() + + publish_create_span.record_exception(exc) + publish_create_span.set_status( + trace.Status(status_code=trace.StatusCode.ERROR) + ) + publish_create_span.end() return future def on_publish_done(future): @@ -386,31 +482,78 @@ def on_publish_done(future): if timeout is gapic_v1.method.DEFAULT: # if custom timeout not passed in timeout = self.publisher_options.timeout + if self._open_telemetry_enabled: + with tracer.start_as_current_span( + name=_OPEN_TELEMETRY_PUBLISHER_BATCHING, + kind=trace.SpanKind.INTERNAL, + context=set_span_in_context(publish_create_span), + end_on_exit=False, + ) as publisher_batching_span: + pass with self._batch_lock: - if self._is_stopped: - raise RuntimeError("Cannot publish on a stopped publisher.") - - # Set retry timeout to "infinite" when message ordering is enabled. - # Note that this then also impacts messages added with an empty - # ordering key. - if self._enable_message_ordering: - if retry is gapic_v1.method.DEFAULT: - # use the default retry for the publish GRPC method as a base - transport = self._transport - base_retry = transport._wrapped_methods[transport.publish]._retry - retry = base_retry.with_deadline(2.0**32) - # timeout needs to be overridden and set to infinite in - # addition to the retry deadline since both determine - # the duration for which retries are attempted. - timeout = 2.0**32 - elif retry is not None: - retry = retry.with_deadline(2.0**32) - timeout = 2.0**32 - - # Delegate the publishing to the sequencer. - sequencer = self._get_or_create_sequencer(topic, ordering_key) - future = sequencer.publish(message, retry=retry, timeout=timeout) - future.add_done_callback(on_publish_done) + try: + if self._is_stopped: + raise RuntimeError("Cannot publish on a stopped publisher.") + + # Set retry timeout to "infinite" when message ordering is enabled. + # Note that this then also impacts messages added with an empty + # ordering key. + if self._enable_message_ordering: + if retry is gapic_v1.method.DEFAULT: + # use the default retry for the publish GRPC method as a base + transport = self._transport + base_retry = transport._wrapped_methods[ + transport.publish + ]._retry + retry = base_retry.with_deadline(2.0**32) + # timeout needs to be overridden and set to infinite in + # addition to the retry deadline since both determine + # the duration for which retries are attempted. + timeout = 2.0**32 + elif retry is not None: + retry = retry.with_deadline(2.0**32) + timeout = 2.0**32 + + # Delegate the publishing to the sequencer. + sequencer = self._get_or_create_sequencer(topic, ordering_key) + + create_span = None + if self._open_telemetry_enabled: + create_span = publish_create_span + message_wrapper = PublishMessageWrapper( + message=message, + span=create_span, + ) + future = sequencer.publish( + message=message_wrapper, + retry=retry, + timeout=timeout, + ) + future.add_done_callback(on_publish_done) + except BaseException as be: + # Exceptions can be thrown when attempting to add messages to the batch. + # If they're thrown, record it in the publisher batching span before + # allowing it to bubble up. + if self._open_telemetry_enabled: + publisher_batching_span.record_exception( + exception=be, + ) + publisher_batching_span.set_status( + trace.Status(status_code=trace.StatusCode.ERROR) + ) + publisher_batching_span.end() + + publish_create_span.record_exception( + exception=be, + ) + publish_create_span.set_status( + trace.Status(status_code=trace.StatusCode.ERROR) + ) + publish_create_span.end() + raise be + + if self._open_telemetry_enabled: + publisher_batching_span.end() # Create a timer thread if necessary to enforce the batching # timeout. diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 15ad4abb3..c4a90023f 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -79,6 +79,13 @@ def __init__(self, manager: "StreamingPullManager", queue: "queue.Queue"): self._thread: Optional[threading.Thread] = None self._operational_lock = threading.Lock() + @property + def manager(self) -> "StreamingPullManager": + """Returns the Streaming Pull Manager associated with this dispatcher + instance. + """ + return self._manager + def start(self) -> None: """Start a thread to dispatch requests queued up by callbacks. @@ -179,7 +186,21 @@ def dispatch_callback(self, items: Sequence[RequestItem]) -> None: self.lease(lease_requests) if modack_requests: + for req in modack_requests: + if ( + req + and req.open_telemetry_data + and req.open_telemetry_data.subscribe_span + ): + req.open_telemetry_data.subscribe_span.add_event("modack start") self.modify_ack_deadline(modack_requests) + for req in modack_requests: + if ( + req + and req.open_telemetry_data + and req.open_telemetry_data.subscribe_span + ): + req.open_telemetry_data.subscribe_span.add_event("modack end") # Note: Drop and ack *must* be after lease. It's possible to get both # the lease and the ack/drop request in the same batch. @@ -243,6 +264,20 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None: ack_reqs_dict=ack_reqs_dict, ) + if self._manager.open_telemetry_enabled: + for completed_ack in requests_completed: + if completed_ack.open_telemetry_data: + subscribe_span = ( + completed_ack.open_telemetry_data.subscribe_span + ) + if subscribe_span: + subscribe_span.set_attribute( + key="messaging.gcp_pubsub.result", + value="ack", + ) + subscribe_span.add_event("ack end") + subscribe_span.end() + # Remove the completed messages from lease management. self.drop(requests_completed) @@ -289,6 +324,20 @@ def _retry_acks(self, requests_to_retry): assert ( len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE ), "Too many requests to be retried." + + if self._manager.open_telemetry_enabled: + for completed_ack in requests_completed: + if completed_ack.open_telemetry_data: + subscribe_span = ( + completed_ack.open_telemetry_data.subscribe_span + ) + if subscribe_span: + subscribe_span.set_attribute( + key="messaging.gcp_pubsub.result", + value="ack", + ) + subscribe_span.add_event("ack end") + subscribe_span.end() # Remove the completed messages from lease management. self.drop(requests_completed) @@ -342,9 +391,10 @@ def modify_ack_deadline( for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) } requests_to_retry: List[requests.ModAckRequest] + requests_completed: List[requests.ModAckRequest] if default_deadline is None: # no further work needs to be done for `requests_to_retry` - _, requests_to_retry = self._manager.send_unary_modack( + requests_completed, requests_to_retry = self._manager.send_unary_modack( modify_deadline_ack_ids=list( itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE) ), @@ -355,7 +405,7 @@ def modify_ack_deadline( default_deadline=None, ) else: - _, requests_to_retry = self._manager.send_unary_modack( + requests_completed, requests_to_retry = self._manager.send_unary_modack( modify_deadline_ack_ids=itertools.islice( ack_ids_gen, _ACK_IDS_BATCH_SIZE ), @@ -375,7 +425,7 @@ def modify_ack_deadline( functools.partial(self._retry_modacks, requests_to_retry), ) - def _retry_modacks(self, requests_to_retry): + def _retry_modacks(self, requests_to_retry: List[requests.ModAckRequest]): retry_delay_gen = exponential_sleep_generator( initial=_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS, maximum=_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS, @@ -405,11 +455,25 @@ def nack(self, items: Sequence[requests.NackRequest]) -> None: self.modify_ack_deadline( [ requests.ModAckRequest( - ack_id=item.ack_id, seconds=0, future=item.future + ack_id=item.ack_id, + seconds=0, + future=item.future, + open_telemetry_data=item.open_telemetry_data, ) for item in items ] ) + if self._manager.open_telemetry_enabled: + for item in items: + if item.open_telemetry_data: + subscribe_span = item.open_telemetry_data.subscribe_span + if subscribe_span: + subscribe_span.set_attribute( + key="messaging.gcp_pubsub.result", + value="nack", + ) + subscribe_span.add_event("nack end") + self.drop( [ requests.DropRequest( diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index 16018e384..68b82adb9 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -22,6 +22,8 @@ import typing from typing import Dict, Iterable, Optional, Union +from opentelemetry import trace + from google.cloud.pubsub_v1.subscriber._protocol.dispatcher import _MAX_BATCH_LATENCY try: @@ -42,6 +44,9 @@ _LOGGER = logging.getLogger(__name__) _LEASE_WORKER_NAME = "Thread-LeaseMaintainer" +_OPEN_TELEMETRY_TRACER_NAME = "google.cloud.pubsub_v1.subscriber" +"""Open Telemetry Instrumenting module name.""" +_OPEN_TELEMETRY_MESSAGING_SYSTEM = "gcp_pubsub" class _LeasedMessage(typing.NamedTuple): @@ -50,6 +55,7 @@ class _LeasedMessage(typing.NamedTuple): size: int ordering_key: Optional[str] + subscribe_span: Optional[trace.Span] class Leaser(object): @@ -98,6 +104,9 @@ def add(self, items: Iterable[requests.LeaseRequest]) -> None: sent_time=float("inf"), size=item.byte_size, ordering_key=item.ordering_key, + subscribe_span=item.subscribe_span + if item.subscribe_span + else None, ) self._bytes += item.byte_size else: @@ -190,7 +199,38 @@ def maintain_leases(self) -> None: expired_ack_ids = set() if ack_ids: _LOGGER.debug("Renewing lease for %d ack IDs.", len(ack_ids)) - + if self._manager.open_telemetry_enabled: + subscriber_span_links = [] + lease_requests = leased_messages.values() + for lease_request in lease_requests: + if lease_request.subscribe_span is not None: + subscriber_span_links.append( + trace.Link( + lease_request.subscribe_span.get_span_context() + ) + ) + tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) + with tracer.start_as_current_span( + name=f"{self._manager._subscription} modack", + attributes={ + "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, + "messaging.batch.message_count": len(ack_ids), + "messaging.gcp_pubsub.message.ack_deadline": deadline, + "messaging.gcp_pubsub.is_receipt_modack": False, + "messaging.destination.name": self._manager._subscription.split( + "/" + )[ + 3 + ], + "gcp.project_id": self._manager._subscription.split("/")[1], + "messaging.operation.name": "modack", + "code.function": "maintain_leases", + }, + links=subscriber_span_links if subscriber_span_links else None, + kind=trace.SpanKind.CLIENT, + end_on_exit=False, + ) as modack_span: + pass # NOTE: This may not work as expected if ``consumer.active`` # has changed since we checked it. An implementation # without any sort of race condition would require a @@ -202,6 +242,9 @@ def maintain_leases(self) -> None: ack_id_gen, deadline ) + if self._manager.open_telemetry_enabled: + modack_span.end() + start_time = time.time() # If exactly once delivery is enabled, we should drop all expired ack_ids from lease management. if self._manager._exactly_once_delivery_enabled() and len(expired_ack_ids): diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py b/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py index 63c2edbfa..958bcdde7 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py @@ -17,12 +17,18 @@ import typing from typing import Any, Callable, Iterable, Optional +from opentelemetry import trace +from opentelemetry.trace.propagation import set_span_in_context + if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud.pubsub_v1 import subscriber _LOGGER = logging.getLogger(__name__) +_OPEN_TELEMETRY_TRACER_NAME = "google.cloud.pubsub_v1.subscriber" +"""Open Telemetry Instrumenting module name.""" + class MessagesOnHold(object): """Tracks messages on hold by ordering key. Not thread-safe.""" @@ -48,6 +54,8 @@ def __init__(self): # flight, but there are no pending messages. self._pending_ordered_messages = {} + self._tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) + @property def size(self) -> int: """Return the number of messages on hold across ordered and unordered messages. @@ -100,6 +108,16 @@ def put(self, message: "subscriber.message.Message") -> None: Args: message: The message to put on hold. """ + if message.open_telemetry_data: + with self._tracer.start_as_current_span( + name="subscriber scheduler", + kind=trace.SpanKind.INTERNAL, + context=set_span_in_context(message.open_telemetry_data.subscribe_span) + if message.open_telemetry_data.subscribe_span + else None, + end_on_exit=False, + ) as scheduler_span: + message.open_telemetry_data.scheduler_span = scheduler_span self._messages_on_hold.append(message) self._size = self._size + 1 diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/requests.py b/google/cloud/pubsub_v1/subscriber/_protocol/requests.py index 9cd387545..ec688a646 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/requests.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/requests.py @@ -15,6 +15,10 @@ import typing from typing import NamedTuple, Optional +from opentelemetry import trace + +from google.cloud.pubsub_v1.opentelemetry.subscribe_spans_data import OpenTelemetryData + if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud.pubsub_v1.subscriber import futures @@ -27,6 +31,7 @@ class AckRequest(NamedTuple): time_to_ack: float ordering_key: Optional[str] future: Optional["futures.Future"] + open_telemetry_data: Optional[OpenTelemetryData] = None class DropRequest(NamedTuple): @@ -39,12 +44,14 @@ class LeaseRequest(NamedTuple): ack_id: str byte_size: int ordering_key: Optional[str] + subscribe_span: Optional[trace.Span] = None class ModAckRequest(NamedTuple): ack_id: str seconds: float future: Optional["futures.Future"] + open_telemetry_data: Optional[OpenTelemetryData] = None class NackRequest(NamedTuple): @@ -52,3 +59,4 @@ class NackRequest(NamedTuple): byte_size: int ordering_key: Optional[str] future: Optional["futures.Future"] + open_telemetry_data: Optional[OpenTelemetryData] = None diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index b8531db17..1c87bc5fc 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -25,6 +25,11 @@ import grpc # type: ignore +from opentelemetry import trace +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from google.cloud.pubsub_v1.opentelemetry.subscribe_spans_data import OpenTelemetryData +from opentelemetry.trace.propagation import set_span_in_context + from google.api_core import bidi from google.api_core import exceptions from google.cloud.pubsub_v1 import types @@ -34,6 +39,7 @@ from google.cloud.pubsub_v1.subscriber._protocol import leaser from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold from google.cloud.pubsub_v1.subscriber._protocol import requests +from google.cloud.pubsub_v1.opentelemetry.context_propagation import OTelContextGetter from google.cloud.pubsub_v1.subscriber.exceptions import ( AcknowledgeError, AcknowledgeStatus, @@ -91,6 +97,10 @@ code_pb2.UNAVAILABLE, } +_OPEN_TELEMETRY_TRACER_NAME = "google.cloud.pubsub_v1.subscriber" +"""Open Telemetry Instrumenting module name.""" +_OPEN_TELEMETRY_MESSAGING_SYSTEM = "gcp_pubsub" + def _wrap_as_exception(maybe_exception: Any) -> BaseException: """Wrap an object as a Python exception, if needed. @@ -113,6 +123,7 @@ def _wrap_as_exception(maybe_exception: Any) -> BaseException: def _wrap_callback_errors( callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any], on_callback_error: Callable[[BaseException], Any], + subscription: str, message: "google.cloud.pubsub_v1.subscriber.message.Message", ): """Wraps a user callback so that if an exception occurs the message is @@ -123,6 +134,29 @@ def _wrap_callback_errors( message: The Pub/Sub message. """ try: + if message.open_telemetry_data: + if message.open_telemetry_data.concurrency_control_span: + message.open_telemetry_data.concurrency_control_span.end() + if message.open_telemetry_data.scheduler_span: + message.open_telemetry_data.scheduler_span.end() + tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) + subscribe_span = message.open_telemetry_data.subscribe_span + publisher_create_span_link = None + if subscribe_span and subscribe_span.parent: + publisher_create_span_link = trace.Link(subscribe_span.parent) + with tracer.start_as_current_span( + name=f"{subscription.split('/')[3]} process", + attributes={ + "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, + }, + kind=trace.SpanKind.INTERNAL, + context=set_span_in_context(subscribe_span) if subscribe_span else None, + links=[publisher_create_span_link] + if publisher_create_span_link + else None, + end_on_exit=False, + ) as process_span: + message.open_telemetry_data.process_span = process_span callback(message) except BaseException as exc: # Note: the likelihood of this failing is extremely low. This just adds @@ -272,6 +306,7 @@ def __init__( await_callbacks_on_shutdown: bool = False, ): self._client = client + self._open_telemetry_enabled = client.open_telemetry_enabled self._subscription = subscription self._exactly_once_enabled = False self._flow_control = flow_control @@ -365,6 +400,12 @@ def __init__( self._leaser: Optional[leaser.Leaser] = None self._consumer: Optional[bidi.BackgroundConsumer] = None self._heartbeater: Optional[heartbeater.Heartbeater] = None + self._tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) + + @property + def open_telemetry_enabled(self) -> bool: + """Whether open telemetry is enabled.""" + return self._open_telemetry_enabled @property def is_active(self) -> bool: @@ -618,6 +659,20 @@ def _schedule_message_on_hold( ) assert self._scheduler is not None assert self._callback is not None + if ( + self.open_telemetry_enabled + and msg.open_telemetry_data + and msg.open_telemetry_data.subscribe_span + ): + with self._tracer.start_as_current_span( + name="subscriber concurrency control", + kind=trace.SpanKind.INTERNAL, + context=set_span_in_context(msg.open_telemetry_data.subscribe_span), + end_on_exit=False, + ) as concurrency_control_span: + msg.open_telemetry_data.concurrency_control_span = ( + concurrency_control_span + ) self._scheduler.schedule(self._callback, msg) def send_unary_ack( @@ -834,7 +889,10 @@ def open( raise ValueError("This manager has been closed and can not be re-used.") self._callback = functools.partial( - _wrap_callback_errors, callback, on_callback_error + _wrap_callback_errors, + callback, + on_callback_error, + self._subscription, ) # Create the RPC @@ -1075,6 +1133,35 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: # protobuf message to significantly gain on attribute access performance. received_messages = response._pb.received_messages + subscribe_spans = [] + if self.open_telemetry_enabled: + for received_message in response.received_messages: + parent_span_context = TraceContextTextMapPropagator().extract( + carrier=received_message.message, + getter=OTelContextGetter(), + ) + with self._tracer.start_as_current_span( + name=f"{self._subscription} subscribe", + kind=trace.SpanKind.CONSUMER, + context=parent_span_context if parent_span_context else None, + attributes={ + "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, + "messaging.destination.name": self._subscription, + "gcp.project_id": self._subscription.split("/")[1], + "messaging.message.id": received_message.message.message_id, + "messaging.message.body.size": len( + received_message.message.data + ), + "messaging.gcp_pubsub.message.ack_id": received_message.ack_id, + "messaging.gcp_pubsub.message.ordering_key": received_message.message.ordering_key, + "messaging.gcp_pubsub.message.exactly_once_delivery": response.subscription_properties.exactly_once_delivery_enabled, + "code.function": "_on_response", + "messaging.gcp_pubsub.message.delivery_attempt": received_message.delivery_attempt, + }, + end_on_exit=False, + ) as subscribe_span: + subscribe_spans.append(subscribe_span) + _LOGGER.debug( "Processing %s received message(s), currently on hold %s (bytes %s).", len(received_messages), @@ -1098,15 +1185,44 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: # Immediately (i.e. without waiting for the auto lease management) # modack the messages we received, as this tells the server that we've # received them. + if self.open_telemetry_enabled: + subscriber_span_links = [] + for subscribe_span in subscribe_spans: + subscribe_span.add_event("modack start") + subscriber_span_links.append( + trace.Link(subscribe_span.get_span_context()) + ) + with self._tracer.start_as_current_span( + name=f"{self._subscription.split('/')[3]} modack", + attributes={ + "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, + "messaging.batch.message_count": len(received_messages), + "messaging.gcp_pubsub.message.ack_deadline": self.ack_deadline, + "messaging.gcp_pubsub.is_receipt_modack": True, + "messaging.destination.name": self._subscription.split("/")[3], + "gcp.project_id": self._subscription.split("/")[1], + "messaging.operation.name": "modack", + "code.function": "_on_response", + }, + links=subscriber_span_links if subscriber_span_links else None, + kind=trace.SpanKind.CLIENT, + end_on_exit=False, + ) as receipt_modack_span: + pass ack_id_gen = (message.ack_id for message in received_messages) expired_ack_ids = self._send_lease_modacks( ack_id_gen, self.ack_deadline, warn_on_invalid=False ) + if self.open_telemetry_enabled: + for subscribe_span in subscribe_spans: + subscribe_span.add_event("modack end") + receipt_modack_span.end() with self._pause_resume_lock: assert self._scheduler is not None assert self._leaser is not None + i = 0 for received_message in received_messages: if ( not self._exactly_once_delivery_enabled() @@ -1119,14 +1235,22 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: self._scheduler.queue, self._exactly_once_delivery_enabled, ) + if self.open_telemetry_enabled: + message._open_telemetry_data = OpenTelemetryData( + subscribe_span=subscribe_spans[i], + ) self._messages_on_hold.put(message) self._on_hold_bytes += message.size req = requests.LeaseRequest( ack_id=message.ack_id, byte_size=message.size, ordering_key=message.ordering_key, + subscribe_span=subscribe_spans[i] + if self.open_telemetry_enabled + else None, ) self._leaser.add([req]) + i = i + 1 self._maybe_release_messages() diff --git a/google/cloud/pubsub_v1/subscriber/client.py b/google/cloud/pubsub_v1/subscriber/client.py index 0d0d36a0c..160bb0a13 100644 --- a/google/cloud/pubsub_v1/subscriber/client.py +++ b/google/cloud/pubsub_v1/subscriber/client.py @@ -67,7 +67,15 @@ class Client(subscriber_client.SubscriberClient): ) """ - def __init__(self, **kwargs: Any): + def __init__( + self, + subscriber_options: Union[types.SubscriberOptions, Sequence] = (), + **kwargs: Any + ): + assert ( + isinstance(subscriber_options, types.SubscriberOptions) + or len(subscriber_options) == 0 + ), "subscriber_options must be of type SubscriberOptions or an empty sequence." # Sanity check: Is our goal to use the emulator? # If so, create a grpc insecure channel with the emulator host # as the target. @@ -82,6 +90,12 @@ def __init__(self, **kwargs: Any): self._target = self._transport._host self._closed = False + self.subscriber_options = types.SubscriberOptions(*subscriber_options) + # Option indicating whether open telemetry is enabled or not. + self._open_telemetry_enabled = ( + self.subscriber_options.enable_open_telemetry_tracing + ) + @classmethod def from_service_account_file( # type: ignore[override] cls, filename: str, **kwargs: Any @@ -103,6 +117,11 @@ def from_service_account_file( # type: ignore[override] from_service_account_json = from_service_account_file # type: ignore[assignment] + @property + def open_telemetry_enabled(self) -> bool: + """Returns whether Open Telemetry is enabled for the subscriber client.""" + return self._open_telemetry_enabled + @property def target(self) -> str: """Return the target (where the API is). diff --git a/google/cloud/pubsub_v1/subscriber/message.py b/google/cloud/pubsub_v1/subscriber/message.py index f744966a2..2a5e222aa 100644 --- a/google/cloud/pubsub_v1/subscriber/message.py +++ b/google/cloud/pubsub_v1/subscriber/message.py @@ -25,6 +25,8 @@ from google.cloud.pubsub_v1.subscriber import futures from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus +from google.cloud.pubsub_v1.opentelemetry.subscribe_spans_data import OpenTelemetryData + if typing.TYPE_CHECKING: # pragma: NO COVER import datetime @@ -94,6 +96,7 @@ def __init__( delivery_attempt: int, request_queue: "queue.Queue", exactly_once_delivery_enabled_func: Callable[[], bool] = lambda: False, + open_telemetry_data: Optional[OpenTelemetryData] = None, ): """Construct the Message. @@ -119,6 +122,8 @@ def __init__( responsible for handling those requests. exactly_once_delivery_enabled_func (Callable[[], bool]): A Callable that returns whether exactly-once delivery is currently-enabled. Defaults to a lambda that always returns False. + open_telemetry_data (Optional[OpenTelemetryData]): + Internal only. Contains Open Telemetry data associated with the mesage. """ self._message = message self._ack_id = ack_id @@ -126,6 +131,7 @@ def __init__( self._request_queue = request_queue self._exactly_once_delivery_enabled_func = exactly_once_delivery_enabled_func self.message_id = message.message_id + self._open_telemetry_data = open_telemetry_data # The instantiation time is the time that this message # was received. Tracking this provides us a way to be smart about @@ -232,6 +238,10 @@ def delivery_attempt(self) -> Optional[int]: """ return self._delivery_attempt + @property + def open_telemetry_data(self) -> Optional[OpenTelemetryData]: + return self._open_telemetry_data + def ack(self) -> None: """Acknowledge the given message. @@ -253,6 +263,13 @@ def ack(self) -> None: """ time_to_ack = math.ceil(time.time() - self._received_timestamp) + if self._open_telemetry_data: + subscriber_span = self._open_telemetry_data.subscribe_span + if subscriber_span: + subscriber_span.add_event(name="ack start") + process_span = self._open_telemetry_data.process_span + if process_span: + process_span.add_event(name="ack called") self._request_queue.put( requests.AckRequest( ack_id=self._ack_id, @@ -260,6 +277,7 @@ def ack(self) -> None: time_to_ack=time_to_ack, ordering_key=self.ordering_key, future=None, + open_telemetry_data=self._open_telemetry_data, ) ) @@ -302,6 +320,13 @@ def ack_with_response(self) -> "futures.Future": pubsub_v1.subscriber.exceptions.AcknowledgeError exception will be thrown. """ + if self._open_telemetry_data: + suscriber_span = self._open_telemetry_data.subscribe_span + if suscriber_span: + suscriber_span.add_event(name="ack start") + process_span = self._open_telemetry_data.process_span + if process_span: + process_span.add_event(name="ack called") req_future: Optional[futures.Future] if self._exactly_once_delivery_enabled_func(): future = futures.Future() @@ -317,6 +342,7 @@ def ack_with_response(self) -> "futures.Future": time_to_ack=time_to_ack, ordering_key=self.ordering_key, future=req_future, + open_telemetry_data=self._open_telemetry_data, ) ) return future @@ -429,12 +455,20 @@ def nack(self) -> None: may take place immediately or after a delay, and may arrive at this subscriber or another. """ + if self._open_telemetry_data: + subscriber_span = self._open_telemetry_data.subscribe_span + if subscriber_span: + subscriber_span.add_event("nack start") + process_span = self._open_telemetry_data.process_span + if process_span: + process_span.add_event("nack called") self._request_queue.put( requests.NackRequest( ack_id=self._ack_id, byte_size=self.size, ordering_key=self.ordering_key, future=None, + open_telemetry_data=self._open_telemetry_data, ) ) @@ -472,6 +506,13 @@ def nack_with_response(self) -> "futures.Future": will be thrown. """ + if self._open_telemetry_data: + subscriber_span = self._open_telemetry_data.subscribe_span + if subscriber_span: + subscriber_span.add_event("nack start") + process_span = self._open_telemetry_data.process_span + if process_span: + process_span.add_event("nack called") req_future: Optional[futures.Future] if self._exactly_once_delivery_enabled_func(): future = futures.Future() @@ -486,6 +527,7 @@ def nack_with_response(self) -> "futures.Future": byte_size=self.size, ordering_key=self.ordering_key, future=req_future, + open_telemetry_data=self._open_telemetry_data, ) ) diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index 3d071a189..a5309c9a7 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -131,6 +131,19 @@ class PublishFlowControl(NamedTuple): """The action to take when publish flow control limits are exceeded.""" +class SubscriberOptions(NamedTuple): + """ + Options for the subscriber client. + + Attributes: + enable_open_telemetry_tracing (bool): + Whether to enable OpenTelemetry tracing. Defaults to false. + """ + + enable_open_telemetry_tracing: bool = False + """Whether to enable OpenTelemetry tracing.""" + + # Define the default publisher options. # # This class is used when creating a publisher client to pass in options @@ -151,6 +164,8 @@ class PublisherOptions(NamedTuple): timeout (OptionalTimeout): Timeout settings for message publishing by the client. It should be compatible with :class:`~.pubsub_v1.types.TimeoutType`. + enable_open_telemetry_tracing (bool): + Whether to enable OpenTelemetry tracing. Defaults to false. """ enable_message_ordering: bool = False @@ -174,6 +189,9 @@ class PublisherOptions(NamedTuple): "compatible with :class:`~.pubsub_v1.types.TimeoutType`." ) + enable_open_telemetry_tracing: bool = False + """Whether to enable OpenTelemetry tracing.""" + # Define the type class and default values for flow control settings. # diff --git a/publisher_trace_provider.py b/publisher_trace_provider.py new file mode 100644 index 000000000..a6f109387 --- /dev/null +++ b/publisher_trace_provider.py @@ -0,0 +1,59 @@ +# Utility class that implements tracerProvider and can be used to test the spans created during development +from typing import Callable +from google.cloud import pubsub_v1 +from concurrent import futures +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, + ConsoleSpanExporter, +) + +provider = TracerProvider() +processor = BatchSpanProcessor(ConsoleSpanExporter()) +provider.add_span_processor(processor) + +# Sets the global default tracer provider +trace.set_tracer_provider(provider) + +# Creates a tracer from the global tracer provider +# tracer = trace.get_tracer("my.tracer.name") + +project_id = "cloud-pubsub-load-tests" +topic_id = "mukund-topic" + +publisher = pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions( + enable_open_telemetry_tracing=True, +)) +topic_path = publisher.topic_path(project_id, topic_id) +publish_futures = [] + +# with tracer.start_as_current_span("tracer provider"): + + +def get_callback( + publish_future: pubsub_v1.publisher.futures.Future, data: str +) -> Callable[[pubsub_v1.publisher.futures.Future], None]: + def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None: + try: + # Wait 60 seconds for the publish call to succeed. + print(publish_future.result(timeout=60)) + except futures.TimeoutError: + print(f"Publishing {data} timed out.") + + return callback + + +NUM_MESSAGES_TO_PUBLISH = 1 +for i in range(NUM_MESSAGES_TO_PUBLISH): + data = str(i) + # When you publish a message, the client returns a future. + publish_future = publisher.publish(topic_path, data.encode("utf-8")) + # Non-blocking. Publish failures are handled in the callback function. + publish_future.add_done_callback(get_callback(publish_future, data)) + publish_futures.append(publish_future) + +# Wait for all the publish futures to resolve before exiting. +futures.wait(publish_futures, return_when=futures.ALL_COMPLETED) + +print(f"Published messages with error handler to {topic_path}.") diff --git a/setup.py b/setup.py index a6af31207..d3b584d5e 100644 --- a/setup.py +++ b/setup.py @@ -45,6 +45,8 @@ "protobuf>=3.19.5,<5.0.0dev,!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5", "grpc-google-iam-v1 >= 0.12.4, < 1.0.0dev", "grpcio-status >= 1.33.2", + "opentelemetry-api", + "opentelemetry-sdk", ] extras = {"libcst": "libcst >= 0.3.10"} url = "https://github.com/googleapis/python-pubsub" diff --git a/tests/unit/pubsub_v1/conftest.py b/tests/unit/pubsub_v1/conftest.py index dc4192931..f3cd421e3 100644 --- a/tests/unit/pubsub_v1/conftest.py +++ b/tests/unit/pubsub_v1/conftest.py @@ -12,6 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry import trace + import google.auth.credentials import pytest @@ -23,3 +28,18 @@ def creds(): GOOGLE_APPLICATION_CREDENTIALS set. """ yield google.auth.credentials.AnonymousCredentials() + + +@pytest.fixture(scope="session", autouse=True) +def set_trace_provider(): + provider = TracerProvider() + trace.set_tracer_provider(provider) + + +@pytest.fixture(scope="function") +def span_exporter(): + exporter = InMemorySpanExporter() + processor = SimpleSpanProcessor(exporter) + provider = trace.get_tracer_provider() + provider.add_span_processor(processor) + yield exporter diff --git a/tests/unit/pubsub_v1/publisher/batch/test_base.py b/tests/unit/pubsub_v1/publisher/batch/test_base.py index a95d72c12..8da1a1aa6 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_base.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_base.py @@ -20,6 +20,9 @@ from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.publisher._batch.base import BatchStatus from google.cloud.pubsub_v1.publisher._batch.thread import Batch +from google.cloud.pubsub_v1.opentelemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) from google.pubsub_v1 import types as gapic_types @@ -41,5 +44,10 @@ def create_batch(status, settings=types.BatchSettings()): def test_len(): batch = create_batch(status=BatchStatus.ACCEPTING_MESSAGES) assert len(batch) == 0 - batch.publish(gapic_types.PubsubMessage(data=b"foo")) + batch.publish( + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo"), + span=None, + ), + ) assert len(batch) == 1 diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 2752d62a2..57a775f6f 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -25,6 +25,8 @@ import pytest +from opentelemetry import trace + import google.api_core.exceptions from google.api_core import gapic_v1 from google.auth import credentials @@ -36,10 +38,18 @@ from google.cloud.pubsub_v1.publisher._batch import thread from google.cloud.pubsub_v1.publisher._batch.thread import Batch from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.opentelemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) -def create_client(): - return publisher.Client(credentials=credentials.AnonymousCredentials()) +def create_client(enable_open_telemetry=False): + return publisher.Client( + credentials=credentials.AnonymousCredentials(), + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=enable_open_telemetry, + ), + ) def create_batch( @@ -48,7 +58,8 @@ def create_batch( commit_when_full=True, commit_retry=gapic_v1.method.DEFAULT, commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, - **batch_settings + enable_open_telemetry: bool = False, + **batch_settings, ): """Return a batch object suitable for testing. @@ -68,7 +79,7 @@ def create_batch( Returns: ~.pubsub_v1.publisher.batch.thread.Batch: A batch object. """ - client = create_client() + client = create_client(enable_open_telemetry=enable_open_telemetry) settings = types.BatchSettings(**batch_settings) return Batch( client, @@ -126,8 +137,18 @@ def test_commit_no_op(): def test_blocking__commit(): batch = create_batch() futures = ( - batch.publish({"data": b"This is my message."}), - batch.publish({"data": b"This is another message."}), + batch.publish( + message_wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"This is my message."), + span=None, + ) + ), + batch.publish( + message_wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"This is another message."), + span=None, + ) + ), ) # Set up the underlying API publish method to return a PublishResponse. @@ -160,7 +181,12 @@ def test_blocking__commit(): def test_blocking__commit_custom_retry(): batch = create_batch(commit_retry=mock.sentinel.custom_retry) - batch.publish({"data": b"This is my message."}) + batch.publish( + message_wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"This is my message."), + span=None, + ) + ) # Set up the underlying API publish method to return a PublishResponse. publish_response = gapic_types.PublishResponse(message_ids=["a"]) @@ -182,7 +208,12 @@ def test_blocking__commit_custom_retry(): def test_blocking__commit_custom_timeout(): batch = create_batch(commit_timeout=mock.sentinel.custom_timeout) - batch.publish({"data": b"This is my message."}) + batch.publish( + message_wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"This is my message."), + span=None, + ) + ) # Set up the underlying API publish method to return a PublishResponse. publish_response = gapic_types.PublishResponse(message_ids=["a"]) @@ -217,13 +248,23 @@ def api_publish_delay(topic="", messages=(), retry=None, timeout=None): ) with api_publish_patch: - batch.publish({"data": b"first message"}) + batch.publish( + message_wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"first message"), + span=None, + ) + ) start = datetime.datetime.now() event_set = api_publish_called.wait(timeout=1.0) if not event_set: # pragma: NO COVER pytest.fail("API publish was not called in time") - batch.publish({"data": b"second message"}) + batch.publish( + message_wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"second message"), + span=None, + ) + ) end = datetime.datetime.now() # While a batch commit in progress, waiting for the API publish call to @@ -266,8 +307,18 @@ def test_blocking__commit_no_messages(): def test_blocking__commit_wrong_messageid_length(): batch = create_batch() futures = ( - batch.publish({"data": b"blah blah blah"}), - batch.publish({"data": b"blah blah blah blah"}), + batch.publish( + message_wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah"), + span=None, + ) + ), + batch.publish( + message_wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah blah"), + span=None, + ) + ), ) # Set up a PublishResponse that only returns one message ID. @@ -287,8 +338,18 @@ def test_blocking__commit_wrong_messageid_length(): def test_block__commmit_api_error(): batch = create_batch() futures = ( - batch.publish({"data": b"blah blah blah"}), - batch.publish({"data": b"blah blah blah blah"}), + batch.publish( + message_wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah"), + span=None, + ) + ), + batch.publish( + message_wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah blah"), + span=None, + ) + ), ) # Make the API throw an error when publishing. @@ -306,8 +367,18 @@ def test_block__commmit_api_error(): def test_block__commmit_retry_error(): batch = create_batch() futures = ( - batch.publish({"data": b"blah blah blah"}), - batch.publish({"data": b"blah blah blah blah"}), + batch.publish( + message_wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah"), + span=None, + ) + ), + batch.publish( + message_wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah blah"), + span=None, + ) + ), ) # Make the API throw an error when publishing. @@ -325,23 +396,30 @@ def test_block__commmit_retry_error(): def test_publish_updating_batch_size(): batch = create_batch(topic="topic_foo") messages = ( - gapic_types.PubsubMessage(data=b"foobarbaz"), - gapic_types.PubsubMessage(data=b"spameggs"), - gapic_types.PubsubMessage(data=b"1335020400"), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz"), span=None + ), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"spameggs"), span=None + ), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"1335020400"), span=None + ), ) # Publish each of the messages, which should save them to the batch. futures = [batch.publish(message) for message in messages] # There should be three messages on the batch, and three futures. - assert len(batch.messages) == 3 + assert len(batch.message_wrappers) == 3 assert batch._futures == futures # The size should have been incremented by the sum of the size # contributions of each message to the PublishRequest. base_request_size = gapic_types.PublishRequest(topic="topic_foo")._pb.ByteSize() expected_request_size = base_request_size + sum( - gapic_types.PublishRequest(messages=[msg])._pb.ByteSize() for msg in messages + gapic_types.PublishRequest(messages=[msg.message])._pb.ByteSize() + for msg in messages ) assert batch.size == expected_request_size @@ -350,22 +428,22 @@ def test_publish_updating_batch_size(): def test_publish(): batch = create_batch() - message = gapic_types.PubsubMessage() - future = batch.publish(message) + wrapper = PublishMessageWrapper(gapic_types.PubsubMessage(), None) + future = batch.publish(wrapper) - assert len(batch.messages) == 1 + assert len(batch.message_wrappers) == 1 assert batch._futures == [future] def test_publish_max_messages_zero(): batch = create_batch(topic="topic_foo", max_messages=0) - message = gapic_types.PubsubMessage(data=b"foobarbaz") + wrapper = PublishMessageWrapper(gapic_types.PubsubMessage(data=b"foobarbaz"), None) with mock.patch.object(batch, "commit") as commit: - future = batch.publish(message) + future = batch.publish(wrapper) assert future is not None - assert len(batch.messages) == 1 + assert len(batch.message_wrappers) == 1 assert batch._futures == [future] commit.assert_called_once() @@ -373,30 +451,34 @@ def test_publish_max_messages_zero(): def test_publish_max_messages_enforced(): batch = create_batch(topic="topic_foo", max_messages=1) - message = gapic_types.PubsubMessage(data=b"foobarbaz") - message2 = gapic_types.PubsubMessage(data=b"foobarbaz2") + message = PublishMessageWrapper(gapic_types.PubsubMessage(data=b"foobarbaz"), None) + message2 = PublishMessageWrapper( + gapic_types.PubsubMessage(data=b"foobarbaz2"), None + ) future = batch.publish(message) future2 = batch.publish(message2) assert future is not None assert future2 is None - assert len(batch.messages) == 1 + assert len(batch.message_wrappers) == 1 assert len(batch._futures) == 1 def test_publish_max_bytes_enforced(): batch = create_batch(topic="topic_foo", max_bytes=15) - message = gapic_types.PubsubMessage(data=b"foobarbaz") - message2 = gapic_types.PubsubMessage(data=b"foobarbaz2") + message = PublishMessageWrapper(gapic_types.PubsubMessage(data=b"foobarbaz"), None) + message2 = PublishMessageWrapper( + gapic_types.PubsubMessage(data=b"foobarbaz2"), None + ) future = batch.publish(message) future2 = batch.publish(message2) assert future is not None assert future2 is None - assert len(batch.messages) == 1 + assert len(batch.message_wrappers) == 1 assert len(batch._futures) == 1 @@ -404,9 +486,9 @@ def test_publish_exceed_max_messages(): max_messages = 4 batch = create_batch(max_messages=max_messages) messages = ( - gapic_types.PubsubMessage(data=b"foobarbaz"), - gapic_types.PubsubMessage(data=b"spameggs"), - gapic_types.PubsubMessage(data=b"1335020400"), + PublishMessageWrapper(gapic_types.PubsubMessage(data=b"foobarbaz"), None), + PublishMessageWrapper(gapic_types.PubsubMessage(data=b"spameggs"), None), + PublishMessageWrapper(gapic_types.PubsubMessage(data=b"1335020400"), None), ) # Publish each of the messages, which should save them to the batch. @@ -420,7 +502,9 @@ def test_publish_exceed_max_messages(): # When a fourth message is published, commit should be called. # No future will be returned in this case. - future = batch.publish(gapic_types.PubsubMessage(data=b"last one")) + future = batch.publish( + PublishMessageWrapper(gapic_types.PubsubMessage(data=b"last one"), None) + ) commit.assert_called_once_with() assert future is None @@ -443,7 +527,7 @@ def test_publish_single_message_size_exceeds_server_size_limit(): assert request_size == 1001 # sanity check, just above the (mocked) server limit with pytest.raises(exceptions.MessageTooLargeError): - batch.publish(big_message) + batch.publish(PublishMessageWrapper(big_message, None)) @mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000) @@ -463,8 +547,8 @@ def test_publish_total_messages_size_exceeds_server_size_limit(): assert 1000 < request_size < 1500 with mock.patch.object(batch, "commit") as fake_commit: - batch.publish(messages[0]) - batch.publish(messages[1]) + batch.publish(PublishMessageWrapper(messages[0], None)) + batch.publish(PublishMessageWrapper(messages[1], None)) # The server side limit should kick in and cause a commit. fake_commit.assert_called_once() @@ -472,21 +556,32 @@ def test_publish_total_messages_size_exceeds_server_size_limit(): def test_publish_dict(): batch = create_batch() - future = batch.publish({"data": b"foobarbaz", "attributes": {"spam": "eggs"}}) + message = PublishMessageWrapper( + gapic_types.PubsubMessage(data=b"foobarbaz", attributes={"spam": "eggs"}), None + ) + future = batch.publish(message) # There should be one message on the batch. - expected_message = gapic_types.PubsubMessage( - data=b"foobarbaz", attributes={"spam": "eggs"} + expected_message = PublishMessageWrapper( + gapic_types.PubsubMessage(data=b"foobarbaz", attributes={"spam": "eggs"}), None ) - assert batch.messages == [expected_message] + assert batch.message_wrappers == [expected_message] assert batch._futures == [future] def test_cancel(): batch = create_batch() futures = ( - batch.publish({"data": b"This is my message."}), - batch.publish({"data": b"This is another message."}), + batch.publish( + PublishMessageWrapper( + gapic_types.PubsubMessage(data=b"This is my message."), None + ) + ), + batch.publish( + PublishMessageWrapper( + gapic_types.PubsubMessage(data=b"This is another message."), None + ) + ), ) batch.cancel(BatchCancellationReason.PRIOR_ORDERED_MESSAGE_FAILED) @@ -503,9 +598,9 @@ def test_do_not_commit_when_full_when_flag_is_off(): # Set commit_when_full flag to False batch = create_batch(max_messages=max_messages, commit_when_full=False) messages = ( - gapic_types.PubsubMessage(data=b"foobarbaz"), - gapic_types.PubsubMessage(data=b"spameggs"), - gapic_types.PubsubMessage(data=b"1335020400"), + PublishMessageWrapper(gapic_types.PubsubMessage(data=b"foobarbaz"), None), + PublishMessageWrapper(gapic_types.PubsubMessage(data=b"spameggs"), None), + PublishMessageWrapper(gapic_types.PubsubMessage(data=b"1335020400"), None), ) with mock.patch.object(batch, "commit") as commit: @@ -514,7 +609,9 @@ def test_do_not_commit_when_full_when_flag_is_off(): assert len(futures) == 3 # When a fourth message is published, commit should not be called. - future = batch.publish(gapic_types.PubsubMessage(data=b"last one")) + future = batch.publish( + PublishMessageWrapper(gapic_types.PubsubMessage(data=b"last one"), None) + ) assert commit.call_count == 0 assert future is None @@ -529,12 +626,220 @@ def __call__(self, success): self.success = success +# Refer https://opentelemetry.io/docs/languages/python/#version-support +@pytest.mark.skipif( + sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher" +) +def test_commit_otel_publish_rpc_span_exception(span_exporter): + TOPIC = "projects/projectID/topics/topicID" + batch = create_batch(topic=TOPIC, enable_open_telemetry=True) + + tracer = trace.get_tracer_provider().get_tracer("com.google.cloud.pubsub.v1") + with tracer.start_as_current_span(name="foo", end_on_exit=False) as create_span: + message = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo"), span=create_span + ) + batch.publish(message) + + # Mock publish error. + error = google.api_core.exceptions.InternalServerError("error") + + with mock.patch.object( + type(batch.client), + "_gapic_publish", + side_effect=error, + ): + batch._commit() + + spans = span_exporter.get_finished_spans() + # Span 1: publish RPC Span + # Span 2: create span of message. + assert len(spans) == 2 + + # Verify status of both spans recorded error and exception event. + for span in spans: + assert span.status.status_code == trace.status.StatusCode.ERROR + assert len(span.events) == 1 + assert span.events[0].name == "exception" + assert span.end_time is not None + + +# Refer https://opentelemetry.io/docs/languages/python/#version-support +@pytest.mark.skipif( + sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher" +) +def test_commit_otel_null_span(span_exporter): + """ + Test case checks the null case check scenario and appeases code coverage. + This scenario where open telemetry is enabled, yet the message added to the + batch does not contain a span should not arise in the library. + """ + TOPIC = "projects/projectID/topics/topic" + batch = create_batch( + topic=TOPIC, + enable_open_telemetry=True, + ) + + msg = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo"), + ) + + batch.publish(msg) + + publish_response = gapic_types.PublishResponse(message_ids=["a"]) + with mock.patch.object( + type(batch.client), "_gapic_publish", return_value=publish_response + ): + batch._commit() + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == f"{TOPIC} publish" + assert spans[0].end_time is not None + + error = google.api_core.exceptions.InternalServerError("error") + batch = create_batch( + topic=TOPIC, + enable_open_telemetry=True, + ) + batch.publish(msg) + + with mock.patch.object( + type(batch.client), + "_gapic_publish", + side_effect=error, + ): + batch._commit() + + assert len(spans) == 1 + assert spans[0].name == f"{TOPIC} publish" + assert spans[0].end_time is not None + + +# Refer https://opentelemetry.io/docs/languages/python/#version-support +@pytest.mark.skipif( + sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher" +) +def test_commit_otel_publish_non_sampled(span_exporter): + TOPIC = "projects/projectID/topics/topic" + batch = create_batch( + topic=TOPIC, + enable_open_telemetry=True, + ) + + tracer = trace.get_tracer_provider().get_tracer("com.google.cloud.pubsub.v1") + with tracer.start_as_current_span(name="foo", end_on_exit=False) as span: + span.is_recording = mock.Mock(return_value=False) + msg = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo"), span=span + ) + + batch.publish(msg) + + publish_response = gapic_types.PublishResponse(message_ids=["a"]) + with mock.patch.object( + type(batch.client), "_gapic_publish", return_value=publish_response + ): + batch._commit() + + spans = span_exporter.get_finished_spans() + + # Span 1: publish RPC Span + # Span 2: create span of message. + assert len(spans) == 2 + publish_rpc_span, create_span = spans + assert len(publish_rpc_span.links) == 0 + assert len(create_span.links) == 0 + + +@pytest.mark.skipif( + sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher" +) +def test_commit_otel_publish_rpc_span(span_exporter): + TOPIC = "projects/projectID/topics/topic" + batch = create_batch( + topic=TOPIC, + enable_open_telemetry=True, + ) + + # Simulate message 1 published with its own publisher create span. + tracer = trace.get_tracer_provider().get_tracer("com.google.cloud.pubsub.v1") + with tracer.start_as_current_span(name="foo", end_on_exit=False) as create_span1: + msg1 = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo"), span=create_span1 + ) + + # Simulate message 2 published with its own publisher create span. + with tracer.start_as_current_span(name="bar", end_on_exit=False) as create_span2: + msg2 = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"bar"), + span=create_span2, + ) + + # Add both messages to the batch. + batch.publish(msg1) + batch.publish(msg2) + + publish_response = gapic_types.PublishResponse(message_ids=["a", "b"]) + with mock.patch.object( + type(batch.client), "_gapic_publish", return_value=publish_response + ): + batch._commit() + + spans = span_exporter.get_finished_spans() + + # Span 1: publish RPC span - closed after publish RPC success. + # Span 2: publisher create span of message 1 - closed after publish RPC success. + # Span 3: publisher create span of message 2 - closed after publish RPC success. + assert len(spans) == 3 + # publish_rpc_span, create_span_1, create_span2, non_sampled_span = spans + publish_rpc_span, create_span_1, create_span2 = spans + + # Verify publish_rpc_span + assert publish_rpc_span.name == f"{TOPIC} publish" + assert publish_rpc_span.kind == trace.SpanKind.CLIENT + assert publish_rpc_span.end_time is not None + attributes = publish_rpc_span.attributes + assert attributes["messaging.system"] == "com.google.cloud.pubsub.v1" + assert attributes["messaging.destination.name"] == TOPIC + assert attributes["gcp.project_id"] == "projectID" + assert attributes["messaging.batch.message_count"] == 2 + assert attributes["messaging.operation"] == "publish" + assert attributes["code.function"] == "_commit" + assert publish_rpc_span.parent is None + # Verify the links correspond to the spans of the published messages. + assert len(publish_rpc_span.links) == 2 + assert publish_rpc_span.links[0].context[1] == create_span_1.context[1] + assert publish_rpc_span.links[1].context[1] == create_span2.context[1] + + # Verify spans of the published messages. + assert create_span_1.name == "foo" + assert create_span2.name == "bar" + + # Verify the publish create spans have been closed after publish success. + assert create_span1.end_time is not None + assert create_span2.end_time is not None + + # Verify message_ids returned from gapic publish are added as attributes + # to the publisher create spans of the messages. + assert "messaging.message.id" in create_span1.attributes + assert create_span1.attributes["messaging.message.id"] == "a" + assert "messaging.message.id" in create_span2.attributes + assert create_span2.attributes["messaging.message.id"] == "b" + + # Verify publish end event added to the span + assert len(create_span1.events) == 1 + assert len(create_span2.events) == 1 + assert create_span1.events[0].name == "publish end" + assert create_span2.events[0].name == "publish end" + + def test_batch_done_callback_called_on_success(): batch_done_callback_tracker = BatchDoneCallbackTracker() batch = create_batch(batch_done_callback=batch_done_callback_tracker) # Ensure messages exist. - message = gapic_types.PubsubMessage(data=b"foobarbaz") + message = PublishMessageWrapper(gapic_types.PubsubMessage(data=b"foobarbaz"), None) batch.publish(message) # One response for one published message. @@ -554,7 +859,7 @@ def test_batch_done_callback_called_on_publish_failure(): batch = create_batch(batch_done_callback=batch_done_callback_tracker) # Ensure messages exist. - message = gapic_types.PubsubMessage(data=b"foobarbaz") + message = PublishMessageWrapper(gapic_types.PubsubMessage(data=b"foobarbaz"), None) batch.publish(message) # One response for one published message. @@ -580,7 +885,7 @@ def test_batch_done_callback_called_on_publish_response_invalid(): batch = create_batch(batch_done_callback=batch_done_callback_tracker) # Ensure messages exist. - message = gapic_types.PubsubMessage(data=b"foobarbaz") + message = PublishMessageWrapper(gapic_types.PubsubMessage(data=b"foobarbaz"), None) batch.publish(message) # No message ids returned in successful publish response -> invalid. diff --git a/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py b/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py index 7570c2970..231c803f5 100644 --- a/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py +++ b/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py @@ -27,12 +27,17 @@ from google.cloud.pubsub_v1 import publisher from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.opentelemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) _ORDERING_KEY = "ordering_key_1" def create_message(): - return gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + return PublishMessageWrapper( + gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}), None + ) def create_client(): diff --git a/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py b/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py index 01d9d6ca4..2caeb9afe 100644 --- a/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py +++ b/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py @@ -27,10 +27,15 @@ from google.cloud.pubsub_v1.publisher._batch import base from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.opentelemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) def create_message(): - return gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + return PublishMessageWrapper( + gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}), None + ) def create_client(): @@ -140,7 +145,9 @@ def test_publish_after_batch_error(): batch = client._batch_class( client, "topic_name", types.BatchSettings(max_latency=float("inf")) ) - batch._messages.append(mock.Mock(name="message")) # Make batch truthy (non-empty). + batch._message_wrappers.append( + mock.Mock(name="message") + ) # Make batch truthy (non-empty). sequencer = unordered_sequencer.UnorderedSequencer(client, "topic_name") sequencer._set_batch(batch) diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 9db5e0ef8..3e1ce8a63 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -37,11 +37,16 @@ from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer +from google.cloud.pubsub_v1.opentelemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) from google.pubsub_v1 import types as gapic_types from google.pubsub_v1.services.publisher import client as publisher_client from google.pubsub_v1.services.publisher.transports.grpc import PublisherGrpcTransport +from opentelemetry import trace + def _assert_retries_equal(retry, retry2): # Retry instances cannot be directly compared, because their predicates are @@ -213,6 +218,55 @@ def test_message_ordering_enabled(creds): assert client._enable_message_ordering +def test_publish_otel_batching_exception(creds, span_exporter): + if sys.version_info.major == 3 and sys.version_info.minor < 8: + with pytest.warns( + RuntimeWarning, + match="OpenTelemetry for Python version 3.7 or lower is not supported. Disabling open telemetry tracing.", + ): + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=True, + ), + ) + assert client._open_telemetry_enabled is False + return + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=True, + ), + ) + + # Throw an exception when sequencer.publish() is called + sequencer = mock.Mock(spec=ordered_sequencer.OrderedSequencer) + sequencer.publish = mock.Mock(side_effect=RuntimeError("some error")) + client._get_or_create_sequencer = mock.Mock(return_value=sequencer) + + TOPIC = "projects/projectID/topics/topicID" + with pytest.raises(RuntimeError): + client.publish(TOPIC, b"message") + + spans = span_exporter.get_finished_spans() + + # Span 1: Publisher Flow Control span + # Span 2: Publisher Batching span(ended after exception) + # Span 3: Create Publish span(ended after exception) + assert len(spans) == 3 + + publish_batching_span = spans[1] + publish_create_span = spans[2] + assert publish_batching_span.name == "publisher batching" + assert publish_batching_span.kind == trace.SpanKind.INTERNAL + assert publish_batching_span._parent[1] == publish_create_span._context[1] + + # Verify exception recorded by the Publisher Batching span. + assert publish_batching_span.status.status_code == trace.StatusCode.ERROR + assert len(publish_batching_span.events) == 1 + assert publish_batching_span.events[0].name == "exception" + + def test_publish(creds): client = publisher.Client(credentials=creds) @@ -240,14 +294,106 @@ def test_publish(creds): # Check mock. batch.publish.assert_has_calls( [ - mock.call(gapic_types.PubsubMessage(data=b"spam")), mock.call( - gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"spam"), + span=None, + ) + ), + mock.call( + PublishMessageWrapper( + message=gapic_types.PubsubMessage( + data=b"foo", attributes={"bar": "baz"} + ), + span=None, + ) ), ] ) +def test_publish_otel_context_propagation(creds): + TOPIC = "projects/projectID/topics/topicID" + if sys.version_info.major == 3 and sys.version_info.minor < 8: + with pytest.warns( + RuntimeWarning, + match="OpenTelemetry for Python version 3.7 or lower is not supported. Disabling open telemetry tracing.", + ): + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=True, + ), + ) + assert client._open_telemetry_enabled is False + return + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=True, + ), + ) + + flow_controller_add_mock = mock.Mock( + spec=publisher.flow_controller.FlowController.add + ) + client._flow_controller.add = flow_controller_add_mock + client.publish(TOPIC, b"message") + + flow_controller_add_mock.assert_called_once() + args = flow_controller_add_mock.call_args.args + assert len(args) == 1 + assert "googclient_traceparent" in args[0].attributes + + +def test_publish_otel(creds, span_exporter): + TOPIC = "projects/projectID/topics/topicID" + if sys.version_info.major == 3 and sys.version_info.minor < 8: + with pytest.warns( + RuntimeWarning, + match="OpenTelemetry for Python version 3.7 or lower is not supported. Disabling open telemetry tracing.", + ): + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=True, + ), + ) + assert client._open_telemetry_enabled is False + return + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=True, + ), + ) + + client.publish(TOPIC, b"message") + + spans = span_exporter.get_finished_spans() + + # Publish Create Span is still active, hence note returned + # by the exporter. + # span1 = Publisher Flow Control Span + # span2 = Publisher Batching Span + assert len(spans) == 2 + + flow_control_span, batching_span = spans + + # Verify flow control span. + assert flow_control_span.name == "publisher flow control" + assert flow_control_span.kind == trace.SpanKind.INTERNAL + + # Verify that flow control span is a child of the publish create span. + # assert flow_control_span._parent[1] == spans[0]._context[1] + + # Verify Publisher Batching Span + assert batching_span.name == "publisher batching" + assert batching_span.kind == trace.SpanKind.INTERNAL + # Verify Publisher Batching Span is child of Publish Create Span + # assert batching_span._parent[1] == spans[0]._context[1] + + def test_publish_error_exceeding_flow_control_limits(creds): publisher_options = types.PublisherOptions( flow_control=types.PublishFlowControl( @@ -270,6 +416,70 @@ def test_publish_error_exceeding_flow_control_limits(creds): future2.result() +def test_publish_otel_flow_control_exception(creds, span_exporter): + if sys.version_info.major == 3 and sys.version_info.minor < 8: + with pytest.warns( + RuntimeWarning, + match="OpenTelemetry for Python version 3.7 or lower is not supported. Disabling open telemetry tracing.", + ): + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=True, + ), + ) + assert client._open_telemetry_enabled is False + return + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=True, + ), + ) + + client._flow_controller = mock.Mock(spec=publisher.flow_controller.FlowController) + client._flow_controller.add = mock.Mock( + side_effect=exceptions.FlowControlLimitError + ) + + TOPIC = "projects/projectID/topics/topicID" + client.publish(TOPIC, b"message") + + spans = span_exporter.get_finished_spans() + + # Span 1: Publisher Flow Control Span(closed after exception) + # Span 2: Publisher Create Span(closed after exception) + assert len(spans) == 2 + + flow_control_span, create_span = spans + + assert create_span.name == f"{TOPIC} create" + assert create_span.status.status_code == trace.StatusCode.ERROR + attributes = create_span.attributes + assert attributes["messaging.system"] == "gcp_pubsub" + assert attributes["messaging.destination.name"] == TOPIC + assert attributes["code.function"] == "google.cloud.pubsub.PublisherClient.publish" + assert attributes["messaging.gcp_pubsub.message.ordering_key"] == "" + assert attributes["messaging.operation"] == "create" + assert attributes["gcp.project_id"] == "projectID" + assert "messaging.message.body.size" in attributes + assert create_span.kind == trace.SpanKind.PRODUCER + + assert len(create_span.events) == 2 + # Verify start event + start_event = create_span.events[0] + assert start_event.name == "publish start" + assert "timestamp" in start_event.attributes + assert create_span.events[1].name == "exception" + + assert flow_control_span.name == "publisher flow control" + assert flow_control_span.status.status_code == trace.StatusCode.ERROR + assert flow_control_span.kind == trace.SpanKind.INTERNAL + assert len(flow_control_span.events) == 1 + assert flow_control_span.events[0].name == "exception" + assert flow_control_span._parent[1] == create_span._context[1] + + def test_publish_data_not_bytestring_error(creds): client = publisher.Client(credentials=creds) topic = "topic/path" @@ -381,7 +591,14 @@ def test_publish_attrs_bytestring(creds): # The attributes should have been sent as text. batch.publish.assert_called_once_with( - gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + # gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + PublishMessageWrapper( + message=gapic_types.PubsubMessage( + data=b"foo", + attributes={"bar": "baz"}, + ), + span=None, + ) ) @@ -421,8 +638,12 @@ def test_publish_new_batch_needed(creds): commit_timeout=gapic_v1.method.DEFAULT, ) message_pb = gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) - batch1.publish.assert_called_once_with(message_pb) - batch2.publish.assert_called_once_with(message_pb) + wrapper = PublishMessageWrapper( + message=message_pb, + span=None, + ) + batch1.publish.assert_called_once_with(wrapper) + batch2.publish.assert_called_once_with(wrapper) def test_publish_attrs_type_error(creds): @@ -445,9 +666,9 @@ def test_publish_custom_retry_overrides_configured_retry(creds): client.publish(topic, b"hello!", retry=mock.sentinel.custom_retry) fake_sequencer.publish.assert_called_once_with( - mock.ANY, retry=mock.sentinel.custom_retry, timeout=mock.ANY + message=mock.ANY, retry=mock.sentinel.custom_retry, timeout=mock.ANY ) - message = fake_sequencer.publish.call_args.args[0] + message = fake_sequencer.publish.call_args.kwargs["message"].message assert message.data == b"hello!" @@ -464,9 +685,9 @@ def test_publish_custom_timeout_overrides_configured_timeout(creds): client.publish(topic, b"hello!", timeout=mock.sentinel.custom_timeout) fake_sequencer.publish.assert_called_once_with( - mock.ANY, retry=mock.ANY, timeout=mock.sentinel.custom_timeout + message=mock.ANY, retry=mock.ANY, timeout=mock.sentinel.custom_timeout ) - message = fake_sequencer.publish.call_args.args[0] + message = fake_sequencer.publish.call_args.kwargs["message"].message assert message.data == b"hello!" @@ -626,12 +847,22 @@ def test_publish_with_ordering_key(creds): # Check mock. batch.publish.assert_has_calls( [ - mock.call(gapic_types.PubsubMessage(data=b"spam", ordering_key="k1")), mock.call( - gapic_types.PubsubMessage( - data=b"foo", attributes={"bar": "baz"}, ordering_key="k1" + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"spam", ordering_key="k1"), + span=None, ) ), + mock.call( + PublishMessageWrapper( + message=gapic_types.PubsubMessage( + data=b"foo", + attributes={"bar": "baz"}, + ordering_key="k1", + ), + span=None, + ), + ), ] ) diff --git a/tests/unit/pubsub_v1/subscriber/test_context_getter.py b/tests/unit/pubsub_v1/subscriber/test_context_getter.py new file mode 100644 index 000000000..aea21db49 --- /dev/null +++ b/tests/unit/pubsub_v1/subscriber/test_context_getter.py @@ -0,0 +1,32 @@ +from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.opentelemetry.context_propagation import OTelContextGetter + + +def test_get(): + message = gapic_types.PubsubMessage( + data=b"data", + attributes={ + "non_otel_key": "non_otel_key_value", + "googclient_traceparent": "traceparent_value", + }, + ) + val = OTelContextGetter().get(message, "traceparent") + assert val == ["traceparent_value"] + + val = OTelContextGetter().get(message, "non_existent_key") + assert val == [""] + + +def test_keys(): + message = gapic_types.PubsubMessage( + data=b"data", + attributes={ + "non_otel_key": "non_otel_key_value", + "googclient_traceparent": "traceparent_value", + "googclient_tracestate": "tracestate_value", + }, + ) + keys = OTelContextGetter().keys(message) + assert "non_otel_key" in keys + assert "googclient_traceparent" in keys + assert "googclient_tracestate" in keys diff --git a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index 89d72c61d..965e2c1d2 100644 --- a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -22,6 +22,9 @@ from google.cloud.pubsub_v1.subscriber._protocol import requests from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager from google.cloud.pubsub_v1.subscriber import futures +from google.cloud.pubsub_v1.opentelemetry.subscribe_spans_data import OpenTelemetryData + +from opentelemetry import trace # special case python < 3.8 if sys.version_info.major == 3 and sys.version_info.minor < 8: @@ -35,6 +38,34 @@ ) +def test_modack_otel(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + subscribe_span = mock.Mock(spec=trace.Span) + items = [ + requests.ModAckRequest( + "0", + 0, + None, + open_telemetry_data=OpenTelemetryData(subscribe_span=subscribe_span), + ), + requests.ModAckRequest("1", 0, None, open_telemetry_data=OpenTelemetryData()), + ] + + with mock.patch.object(dispatcher_, "modify_ack_deadline"): + dispatcher_.dispatch_callback(items) + + # Assert that modack start and end events were added to the subscribe span. + subscribe_span.add_event.assert_has_calls( + [ + mock.call("modack start"), + mock.call("modack end"), + ] + ) + + @pytest.mark.parametrize( "item,method_name", [ @@ -60,6 +91,14 @@ def test_dispatch_callback_active_manager(item, method_name): manager._exactly_once_delivery_enabled.assert_called() +def test_manager_property(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager=manager, queue=mock.sentinel.queue) + assert dispatcher_.manager is manager + + @pytest.mark.parametrize( "item,method_name", [ @@ -365,10 +404,20 @@ def test_unknown_request_type(): dispatcher_.dispatch_callback(items) -def test_ack(): +@pytest.mark.parametrize( + "otel_enabled, otel_data", + [ + (False, None), + (True, OpenTelemetryData(subscribe_span=mock.Mock(spec=trace.Span))), + (True, None), + (True, OpenTelemetryData()), + ], +) +def test_ack(otel_enabled, otel_data): manager = mock.create_autospec( streaming_pull_manager.StreamingPullManager, instance=True ) + manager.open_telemetry_enabled = otel_enabled dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) items = [ @@ -378,7 +427,8 @@ def test_ack(): time_to_ack=20, ordering_key="", future=None, - ) + open_telemetry_data=otel_data, + ), ] manager.send_unary_ack.return_value = (items, []) dispatcher_.ack(items) @@ -390,6 +440,12 @@ def test_ack(): manager.leaser.remove.assert_called_once_with(items) manager.maybe_resume_consumer.assert_called_once() manager.ack_histogram.add.assert_called_once_with(20) + if otel_data and otel_data.subscribe_span: + otel_data.subscribe_span.set_attribute.assert_called_with( + key="messaging.gcp_pubsub.result", + value="ack", + ) + otel_data.subscribe_span.add_event.assert_called_with("ack end") def test_ack_no_time(): @@ -481,10 +537,20 @@ def test_retry_acks_in_new_thread(): assert ctor_call.kwargs["daemon"] -def test_retry_acks(): +@pytest.mark.parametrize( + "otel_enabled,otel_data", + [ + (False, None), + (True, OpenTelemetryData(subscribe_span=mock.Mock(spec=trace.Span))), + (True, OpenTelemetryData()), + (True, None), + ], +) +def test_retry_acks(otel_enabled, otel_data): manager = mock.create_autospec( streaming_pull_manager.StreamingPullManager, instance=True ) + manager.open_telemetry_enabled = otel_enabled dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) f = futures.Future() @@ -495,6 +561,7 @@ def test_retry_acks(): time_to_ack=20, ordering_key="", future=f, + open_telemetry_data=otel_data, ) ] # first and second `send_unary_ack` calls fail, third one succeeds @@ -516,6 +583,13 @@ def test_retry_acks(): ] ) + if otel_data and otel_data.subscribe_span: + otel_data.subscribe_span.set_attribute.assert_called_with( + key="messaging.gcp_pubsub.result", + value="ack", + ) + otel_data.subscribe_span.add_event.assert_called_with("ack end") + def test_retry_modacks_in_new_thread(): manager = mock.create_autospec( @@ -633,15 +707,29 @@ def test_drop_ordered_messages(): manager.maybe_resume_consumer.assert_called_once() -def test_nack(): +@pytest.mark.parametrize( + "otel_enabled,otel_data", + [ + (False, None), + (True, OpenTelemetryData(subscribe_span=mock.Mock(spec=trace.Span))), + (True, None), + (True, OpenTelemetryData()), + ], +) +def test_nack(otel_enabled, otel_data): manager = mock.create_autospec( streaming_pull_manager.StreamingPullManager, instance=True ) + manager.open_telemetry_enabled = otel_enabled dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) items = [ requests.NackRequest( - ack_id="ack_id_string", byte_size=10, ordering_key="", future=None + ack_id="ack_id_string", + byte_size=10, + ordering_key="", + future=None, + open_telemetry_data=otel_data, ) ] manager.send_unary_modack.return_value = (items, []) @@ -657,10 +745,20 @@ def test_nack(): ack_reqs_dict = call[1]["ack_reqs_dict"] assert ack_reqs_dict == { "ack_id_string": requests.ModAckRequest( - ack_id="ack_id_string", seconds=0, future=None + ack_id="ack_id_string", + seconds=0, + future=None, + open_telemetry_data=otel_data, ) } + if otel_data and otel_data.subscribe_span: + otel_data.subscribe_span.set_attribute.assert_called_with( + key="messaging.gcp_pubsub.result", + value="nack", + ) + otel_data.subscribe_span.add_event.assert_called_with("nack end") + def test_modify_ack_deadline(): manager = mock.create_autospec( diff --git a/tests/unit/pubsub_v1/subscriber/test_leaser.py b/tests/unit/pubsub_v1/subscriber/test_leaser.py index f38717c6f..e6d6e1b75 100644 --- a/tests/unit/pubsub_v1/subscriber/test_leaser.py +++ b/tests/unit/pubsub_v1/subscriber/test_leaser.py @@ -23,6 +23,8 @@ from google.cloud.pubsub_v1.subscriber._protocol import requests from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager +from opentelemetry import trace + # special case python < 3.8 if sys.version_info.major == 3 and sys.version_info.minor < 8: import mock @@ -82,7 +84,7 @@ def test_remove_negative_bytes(caplog): assert "unexpectedly negative" in caplog.text -def create_manager(flow_control=types.FlowControl()): +def create_manager(flow_control=types.FlowControl(), open_telemetry_enabled=False): manager = mock.create_autospec( streaming_pull_manager.StreamingPullManager, instance=True ) @@ -91,6 +93,8 @@ def create_manager(flow_control=types.FlowControl()): manager.flow_control = flow_control manager.ack_histogram = histogram.Histogram() manager._obtain_ack_deadline.return_value = 10 + manager._subscription = "projects/projectID/subscriptions/subscriptionID" + manager.open_telemetry_enabled = open_telemetry_enabled return manager @@ -136,23 +140,60 @@ def trigger_done(timeout): leaser._stop_event.wait = trigger_done -def test_maintain_leases_ack_ids(): - manager = create_manager() +@pytest.mark.parametrize( + "otel_enabled", + [ + (False), + (True), + ], +) +def test_maintain_leases_ack_ids(otel_enabled, span_exporter): + manager = create_manager(open_telemetry_enabled=otel_enabled) leaser_ = leaser.Leaser(manager) make_sleep_mark_event_as_done(leaser_) + subscribe_span_mock = mock.Mock() leaser_.add( [requests.LeaseRequest(ack_id="my ack id", byte_size=50, ordering_key="")] ) + leaser_.add( + [ + requests.LeaseRequest( + ack_id="my ack id2", + byte_size=50, + ordering_key="", + subscribe_span=subscribe_span_mock, + ) + ] + ) + manager._send_lease_modacks.return_value = set() leaser_.maintain_leases() assert len(manager._send_lease_modacks.mock_calls) == 1 call = manager._send_lease_modacks.mock_calls[0] ack_ids = list(call.args[0]) - assert ack_ids == ["my ack id"] + assert ack_ids == ["my ack id", "my ack id2"] assert call.args[1] == 10 + if otel_enabled: + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + modack_span = spans[0] + assert ( + modack_span.name == "projects/projectID/subscriptions/subscriptionID modack" + ) + assert modack_span.kind == trace.SpanKind.CLIENT + attributes = modack_span.attributes + assert attributes["messaging.system"] == "gcp_pubsub" + assert attributes["messaging.batch.message_count"] == 2 + assert attributes["messaging.gcp_pubsub.message.ack_deadline"] == 10 + assert attributes["messaging.gcp_pubsub.is_receipt_modack"] is False + assert attributes["messaging.destination.name"] == "subscriptionID" + assert attributes["messaging.operation.name"] == "modack" + assert attributes["code.function"] == "maintain_leases" + assert len(modack_span.links) == 1 + def test_maintain_leases_expired_ack_ids_ignored(): manager = create_manager() diff --git a/tests/unit/pubsub_v1/subscriber/test_message.py b/tests/unit/pubsub_v1/subscriber/test_message.py index 49b07b7fd..7389acecc 100644 --- a/tests/unit/pubsub_v1/subscriber/test_message.py +++ b/tests/unit/pubsub_v1/subscriber/test_message.py @@ -16,6 +16,7 @@ import queue import sys import time +import pytest # special case python < 3.8 if sys.version_info.major == 3 and sys.version_info.minor < 8: @@ -29,6 +30,8 @@ from google.protobuf import timestamp_pb2 from google.pubsub_v1 import types as gapic_types from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeStatus +from google.cloud.pubsub_v1.opentelemetry.subscribe_spans_data import OpenTelemetryData +from opentelemetry import trace RECEIVED = datetime.datetime(2012, 4, 21, 15, 0, tzinfo=datetime.timezone.utc) @@ -44,6 +47,7 @@ def create_message( delivery_attempt=0, ordering_key="", exactly_once_delivery_enabled=False, + open_telemetry_data=None, **attrs ): with mock.patch.object(time, "time") as time_: @@ -65,6 +69,7 @@ def create_message( delivery_attempt=delivery_attempt, request_queue=queue.Queue(), exactly_once_delivery_enabled_func=lambda: exactly_once_delivery_enabled, + open_telemetry_data=open_telemetry_data, ) return msg @@ -131,8 +136,23 @@ def check_call_types(mock, *args, **kwargs): assert isinstance(call_args[n], argtype) -def test_ack(): - msg = create_message(b"foo", ack_id="bogus_ack_id") +@pytest.mark.parametrize( + "otel_data,", + [ + None, + OpenTelemetryData( + subscribe_span=mock.Mock(spec=trace.Span), + process_span=mock.Mock(spec=trace.Span), + ), + OpenTelemetryData(), + ], +) +def test_ack(otel_data): + msg = create_message( + data=b"foo", + ack_id="bogus_ack_id", + open_telemetry_data=otel_data, + ) with mock.patch.object(msg._request_queue, "put") as put: msg.ack() put.assert_called_once_with( @@ -142,13 +162,33 @@ def test_ack(): time_to_ack=mock.ANY, ordering_key="", future=None, + open_telemetry_data=otel_data, ) ) check_call_types(put, requests.AckRequest) - - -def test_ack_with_response_exactly_once_delivery_disabled(): - msg = create_message(b"foo", ack_id="bogus_ack_id") + if otel_data and otel_data.subscribe_span: + otel_data.subscribe_span.add_event.assert_called_with(name="ack start") + if otel_data and otel_data.process_span: + otel_data.process_span.add_event.assert_called_with(name="ack called") + + +@pytest.mark.parametrize( + "otel_data", + [ + None, + OpenTelemetryData( + subscribe_span=mock.Mock(spec=trace.Span), + process_span=mock.Mock(spec=trace.Span), + ), + OpenTelemetryData(), + ], +) +def test_ack_with_response_exactly_once_delivery_disabled(otel_data): + msg = create_message( + b"foo", + ack_id="bogus_ack_id", + open_telemetry_data=otel_data, + ) with mock.patch.object(msg._request_queue, "put") as put: future = msg.ack_with_response() put.assert_called_once_with( @@ -158,12 +198,18 @@ def test_ack_with_response_exactly_once_delivery_disabled(): time_to_ack=mock.ANY, ordering_key="", future=None, + open_telemetry_data=otel_data, ) ) assert future.result() == AcknowledgeStatus.SUCCESS assert future == message._SUCCESS_FUTURE check_call_types(put, requests.AckRequest) + if otel_data and otel_data.subscribe_span: + otel_data.subscribe_span.add_event.assert_called_with(name="ack start") + if otel_data and otel_data.process_span: + otel_data.process_span.add_event.assert_called_with(name="ack called") + def test_ack_with_response_exactly_once_delivery_enabled(): msg = create_message( @@ -227,30 +273,76 @@ def test_modify_ack_deadline_with_response_exactly_once_delivery_enabled(): check_call_types(put, requests.ModAckRequest) -def test_nack(): - msg = create_message(b"foo", ack_id="bogus_ack_id") +@pytest.mark.parametrize( + "otel_data,", + [ + None, + OpenTelemetryData( + subscribe_span=mock.Mock(spec=trace.Span), + process_span=mock.Mock(spec=trace.Span), + ), + OpenTelemetryData(), + ], +) +def test_nack(otel_data): + msg = create_message( + data=b"foo", + ack_id="bogus_ack_id", + open_telemetry_data=otel_data, + ) with mock.patch.object(msg._request_queue, "put") as put: msg.nack() put.assert_called_once_with( requests.NackRequest( - ack_id="bogus_ack_id", byte_size=30, ordering_key="", future=None + ack_id="bogus_ack_id", + byte_size=30, + ordering_key="", + future=None, + open_telemetry_data=otel_data, ) ) check_call_types(put, requests.NackRequest) - - -def test_nack_with_response_exactly_once_delivery_disabled(): - msg = create_message(b"foo", ack_id="bogus_ack_id") + if otel_data and otel_data.subscribe_span: + otel_data.subscribe_span.add_event.assert_called_with("nack start") + if otel_data and otel_data.process_span: + otel_data.process_span.add_event.assert_called_with("nack called") + + +@pytest.mark.parametrize( + "otel_data", + [ + None, + OpenTelemetryData( + subscribe_span=mock.Mock(spec=trace.Span), + process_span=mock.Mock(spec=trace.Span), + ), + OpenTelemetryData(), + ], +) +def test_nack_with_response_exactly_once_delivery_disabled(otel_data): + msg = create_message( + data=b"foo", + ack_id="bogus_ack_id", + open_telemetry_data=otel_data, + ) with mock.patch.object(msg._request_queue, "put") as put: future = msg.nack_with_response() put.assert_called_once_with( requests.NackRequest( - ack_id="bogus_ack_id", byte_size=30, ordering_key="", future=None + ack_id="bogus_ack_id", + byte_size=30, + ordering_key="", + future=None, + open_telemetry_data=otel_data, ) ) assert future.result() == AcknowledgeStatus.SUCCESS assert future == message._SUCCESS_FUTURE check_call_types(put, requests.NackRequest) + if otel_data and otel_data.subscribe_span: + otel_data.subscribe_span.add_event.assert_called_with("nack start") + if otel_data and otel_data.process_span: + otel_data.process_span.add_event.assert_called_with("nack called") def test_nack_with_response_exactly_once_delivery_enabled(): diff --git a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py index 5e1dcf91b..df08bba66 100644 --- a/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py +++ b/tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py @@ -13,13 +13,24 @@ # limitations under the License. import queue +import sys from google.cloud.pubsub_v1.subscriber import message from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.opentelemetry.subscribe_spans_data import OpenTelemetryData +from opentelemetry.trace.span import SpanContext +from opentelemetry import trace -def make_message(ack_id, ordering_key): +# special case python < 3.8 +if sys.version_info.major == 3 and sys.version_info.minor < 8: + import mock +else: + from unittest import mock + + +def make_message(ack_id, ordering_key, open_telemetry_data=None): proto_msg = gapic_types.PubsubMessage(data=b"Q", ordering_key=ordering_key) return message.Message( proto_msg._pb, @@ -27,6 +38,7 @@ def make_message(ack_id, ordering_key): 0, queue.Queue(), exactly_once_delivery_enabled_func=lambda: False, # pragma: NO COVER + open_telemetry_data=open_telemetry_data, ) @@ -37,6 +49,25 @@ def test_init(): assert moh.get() is None +def test_put_otel(): + moh = messages_on_hold.MessagesOnHold() + subscribe_span = mock.Mock(spec=SpanContext) + msg = make_message( + ack_id="ack_id1", + ordering_key="key1", + open_telemetry_data=OpenTelemetryData( + subscribe_span=subscribe_span, + ), + ) + moh.put(msg) + + scheduler_span = msg.open_telemetry_data.scheduler_span + assert scheduler_span is not None + assert scheduler_span.name == "subscriber scheduler" + assert scheduler_span.kind == trace.SpanKind.INTERNAL + assert scheduler_span.is_recording() + + def test_put_and_get_unordered_messages(): moh = messages_on_hold.MessagesOnHold() diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 278f3e88e..382367800 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -41,6 +41,9 @@ from google.cloud.pubsub_v1.subscriber import exceptions as subscriber_exceptions from google.cloud.pubsub_v1.subscriber import futures from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.opentelemetry.subscribe_spans_data import OpenTelemetryData +from opentelemetry.trace import Span +from opentelemetry import trace import grpc from google.rpc import status_pb2 from google.rpc import code_pb2 @@ -65,12 +68,85 @@ def test__wrap_as_exception(exception, expected_cls): ) +def test__wrap_callback_errors_otel_process_span(): + msg = mock.Mock() + callback = mock.Mock() + tracer = trace.get_tracer("google.cloud.pubsub_v1.subscriber") + with tracer.start_as_current_span("publisher_create_span") as publisher_create_span: + with tracer.start_as_current_span("subscriber_span") as subscribe_span: + msg.open_telemetry_data.subscribe_span = subscribe_span + streaming_pull_manager._wrap_callback_errors( + callback, + mock.Mock(), + "projects/projectID/subscriptions/subscriptionID", + msg, + ) + + callback.assert_called_once() + process_span = msg.open_telemetry_data.process_span + assert process_span is not None + assert process_span.name == "subscriptionID process" + assert process_span.kind == trace.SpanKind.INTERNAL + assert process_span.parent.span_id == subscribe_span.context.span_id + # Verify that the link to create span of the associated message is present. + assert len(process_span.links) == 1 + assert ( + process_span.links[0].context.span_id == publisher_create_span.context.span_id + ) + + +@pytest.mark.parametrize( + "otel_data", + [ + ( + OpenTelemetryData( + concurrency_control_span=mock.Mock(spec=Span), + scheduler_span=mock.Mock(spec=Span), + ) + ), + ( + OpenTelemetryData( + concurrency_control_span=None, + scheduler_span=None, + ) + ), + (None), + ], +) +def test__wrap_callback_errors_no_error_otel(otel_data): + msg = mock.Mock() + msg.open_telemetry_data = otel_data + callback = mock.Mock() + on_callback_error = mock.Mock() + + streaming_pull_manager._wrap_callback_errors( + callback, + on_callback_error, + "projects/projectID/subscriptions/subscriptionID", + msg, + ) + + callback.assert_called_once_with(msg) + msg.nack.assert_not_called() + on_callback_error.assert_not_called() + + if otel_data and otel_data.concurrency_control_span: + otel_data.concurrency_control_span.end.assert_called_once() + if otel_data and otel_data.scheduler_span: + otel_data.scheduler_span.end.assert_called_once() + + def test__wrap_callback_errors_no_error(): msg = mock.create_autospec(message.Message, instance=True) callback = mock.Mock() on_callback_error = mock.Mock() - streaming_pull_manager._wrap_callback_errors(callback, on_callback_error, msg) + streaming_pull_manager._wrap_callback_errors( + callback, + on_callback_error, + "projects/projectID/subscriptions/subscriptionID", + msg, + ) callback.assert_called_once_with(msg) msg.nack.assert_not_called() @@ -89,7 +165,12 @@ def test__wrap_callback_errors_error(callback_error): callback = mock.Mock(side_effect=callback_error) on_callback_error = mock.Mock() - streaming_pull_manager._wrap_callback_errors(callback, on_callback_error, msg) + streaming_pull_manager._wrap_callback_errors( + callback, + on_callback_error, + "projects/projectID/subscriptions/subscriptionID", + msg, + ) msg.nack.assert_called_once() on_callback_error.assert_called_once_with(callback_error) @@ -97,6 +178,7 @@ def test__wrap_callback_errors_error(callback_error): def test_constructor_and_default_state(): mock.sentinel.subscription = str() + mock.sentinel.client.open_telemetry_enabled = False manager = streaming_pull_manager.StreamingPullManager( mock.sentinel.client, mock.sentinel.subscription ) @@ -183,7 +265,10 @@ def make_manager(**kwargs): client_ = mock.create_autospec(client.Client, instance=True) scheduler_ = mock.create_autospec(scheduler.Scheduler, instance=True) return streaming_pull_manager.StreamingPullManager( - client_, "subscription-name", scheduler=scheduler_, **kwargs + client_, + "projects/projectID/subscriptions/subscriptionID", + scheduler=scheduler_, + **kwargs ) @@ -1224,14 +1309,14 @@ def test_open_has_been_closed(): manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error) -def make_running_manager(**kwargs): +def make_running_manager(enable_open_telemetry=False, **kwargs): manager = make_manager(**kwargs) manager._consumer = mock.create_autospec(bidi.BackgroundConsumer, instance=True) manager._consumer.is_active = True manager._dispatcher = mock.create_autospec(dispatcher.Dispatcher, instance=True) manager._leaser = mock.create_autospec(leaser.Leaser, instance=True) manager._heartbeater = mock.create_autospec(heartbeater.Heartbeater, instance=True) - + manager._open_telemetry_enabled = enable_open_telemetry return ( manager, manager._consumer, @@ -1415,7 +1500,10 @@ def test__get_initial_request(): initial_request = manager._get_initial_request(123) assert isinstance(initial_request, gapic_types.StreamingPullRequest) - assert initial_request.subscription == "subscription-name" + assert ( + initial_request.subscription + == "projects/projectID/subscriptions/subscriptionID" + ) assert initial_request.stream_ack_deadline_seconds == 123 assert initial_request.modify_deadline_ack_ids == [] assert initial_request.modify_deadline_seconds == [] @@ -1428,7 +1516,10 @@ def test__get_initial_request_wo_leaser(): initial_request = manager._get_initial_request(123) assert isinstance(initial_request, gapic_types.StreamingPullRequest) - assert initial_request.subscription == "subscription-name" + assert ( + initial_request.subscription + == "projects/projectID/subscriptions/subscriptionID" + ) assert initial_request.stream_ack_deadline_seconds == 123 assert initial_request.modify_deadline_ack_ids == [] assert initial_request.modify_deadline_seconds == [] @@ -1466,6 +1557,85 @@ def test__on_response_delivery_attempt(): assert msg2.delivery_attempt == 6 +@pytest.mark.parametrize("otel_enabled", [(True), (False)]) +def test__on_response_mod_ack_otel(span_exporter, otel_enabled): + manager, _, dispatcher, leaser, _, scheduler = make_running_manager( + enable_open_telemetry=otel_enabled, + ) + manager._callback = mock.sentinel.callback + + # Set up the messages. + response = gapic_types.StreamingPullResponse( + received_messages=[ + gapic_types.ReceivedMessage( + ack_id="ack_1", + message=gapic_types.PubsubMessage(data=b"foo", message_id="1"), + delivery_attempt=1, + ), + gapic_types.ReceivedMessage( + ack_id="ack_2", + message=gapic_types.PubsubMessage(data=b"bar", message_id="2"), + delivery_attempt=1, + ), + ] + ) + + # adjust message bookkeeping in leaser + fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=80) + + # Actually run the method and chack that correct MODACK value is used. + with mock.patch.object( + type(manager), "ack_deadline", new=mock.PropertyMock(return_value=18) + ), mock.patch("opentelemetry.trace.get_tracer") as mock_get_tracer: + mock_tracer = mock.MagicMock() + mock_get_tracer.return_value = mock_tracer + mock_span = mock.MagicMock() + mock_tracer.start_as_current_span.return_value = mock_span + mock_span.__enter__.return_value = mock_span + mock_span.__exit__.return_value = None + manager._on_response(response) + + dispatcher.modify_ack_deadline.assert_called_once_with( + [ + requests.ModAckRequest("ack_1", 18, None), + requests.ModAckRequest("ack_2", 18, None), + ], + 18, + ) + + # Subscribe span would still be active, hence would not be exported. + # Receipt modack span would be exported since it would be ended after receipt modacks are complete. + spans = span_exporter.get_finished_spans() + assert len(spans) == (1 if otel_enabled else 0) + if otel_enabled: + call_args = scheduler.schedule.call_args_list + assert len(call_args) == 2 + for call_arg in call_args: + args, _ = call_arg + otel_data = args[1].open_telemetry_data + assert otel_data.concurrency_control_span is not None + assert otel_data.concurrency_control_span.kind == trace.SpanKind.INTERNAL + assert ( + otel_data.concurrency_control_span.name + == "subscriber concurrency control" + ) + + receipt_modack_span = spans[0] + assert receipt_modack_span.name == "subscriptionID modack" + assert receipt_modack_span.kind == trace.SpanKind.CLIENT + assert len(receipt_modack_span.links) == 2 + + attributes = receipt_modack_span.attributes + assert attributes["messaging.system"] == "gcp_pubsub" + assert attributes["messaging.batch.message_count"] == 2 + assert attributes["messaging.gcp_pubsub.message.ack_deadline"] == 18 + assert attributes["messaging.gcp_pubsub.is_receipt_modack"] is True + assert attributes["messaging.destination.name"] == "subscriptionID" + assert attributes["gcp.project_id"] == "projectID" + assert attributes["messaging.operation.name"] == "modack" + assert attributes["code.function"] == "_on_response" + + def test__on_response_modifies_ack_deadline(): manager, _, dispatcher, leaser, _, scheduler = make_running_manager() manager._callback = mock.sentinel.callback diff --git a/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py b/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py index a09d85b00..45cfbe8c1 100644 --- a/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py +++ b/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py @@ -135,6 +135,17 @@ def test_init_emulator(monkeypatch): assert channel.target().decode("utf8") == _EXPECTED_TARGET +def test_otel_subscriber_option(creds): + client = subscriber.Client( + credentials=creds, + subscriber_options=types.SubscriberOptions(enable_open_telemetry_tracing=True), + ) + assert client._open_telemetry_enabled is True + + client = subscriber.Client(credentials=creds) + assert client._open_telemetry_enabled is False + + def test_class_method_factory(): patch = mock.patch( "google.oauth2.service_account.Credentials.from_service_account_file"