Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a7ed13b
feat: Add Publish Create Span for OTel
mukund-ananthu Jun 15, 2024
524a5e1
Add Publisher flow control span
mukund-ananthu Jun 19, 2024
8bfa0a9
Add publisher batching span
mukund-ananthu Jun 19, 2024
88890e0
Add publish batching span exception recording
mukund-ananthu Jun 19, 2024
792346d
Use local vars for spans instead of member vars
mukund-ananthu Jun 20, 2024
becf0ce
Start publisher batching span from time batch lock is acquired
mukund-ananthu Jun 20, 2024
fa4f506
Add trace context propagation
mukund-ananthu Jun 21, 2024
2fafbc9
Add publish RPC span
mukund-ananthu Jun 22, 2024
ba252cc
Fix lint
mukund-ananthu Jun 22, 2024
f6df5a6
Fix broken tests
mukund-ananthu Jun 22, 2024
8f1f812
Update publish RPC span logic
mukund-ananthu Jun 24, 2024
2ab1518
Add subscriber option to enable open telemetry
mukund-ananthu Jun 24, 2024
7b97217
Add messaging.message.id attribute in the publisher create spans
mukund-ananthu Jun 25, 2024
1f96443
Add subscribe span and record modack events
mukund-ananthu Jun 26, 2024
0b3362a
Add tests for OtelContextGetter
mukund-ananthu Jun 26, 2024
81a96b4
Update the trace names to be the name of the Pub/Sub package
mukund-ananthu Jun 27, 2024
9c0ac12
Modify subscriber Message to contain OpenTelemetryData
mukund-ananthu Jun 28, 2024
0ee007f
Add subscriber concurrency control span
mukund-ananthu Jul 1, 2024
ddab023
Add scheduler span
mukund-ananthu Jul 2, 2024
a7665b6
Refactor _wrap_callback_errors to contain subscription
mukund-ananthu Jul 2, 2024
cf6e486
Add process span
mukund-ananthu Jul 4, 2024
9d7dfdd
Add ack called event to process span
mukund-ananthu Jul 6, 2024
0801c4d
Add ack called event for ack_with_response in process span
mukund-ananthu Jul 6, 2024
857d163
Add nack called event for process span
mukund-ananthu Jul 6, 2024
c9625ff
Fix build failures
mukund-ananthu Jul 6, 2024
6d5d40c
Fix parent setting
mukund-ananthu Jul 7, 2024
f5d13df
Add modack span for receipt modack
mukund-ananthu Jul 7, 2024
6eb054a
Plumb subscriber span information required for modack spans
mukund-ananthu Jul 7, 2024
5882f5f
Add non-receipt modack spans(lease management)
mukund-ananthu Jul 8, 2024
37c16a1
Disable OpenTelemetry Tracing for Python versions <= 3.7
mukund-ananthu Jul 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
15 changes: 15 additions & 0 deletions google/cloud/pubsub_v1/opentelemetry/context_propagation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from opentelemetry.propagators.textmap import Getter
from google.pubsub_v1 import types as gapic_types
import typing


class OTelContextGetter(Getter):
"""Used to extract Open Telemetry Context from message attributes."""

def get(
self, carrier: gapic_types.PubsubMessage, key: str
) -> typing.Optional[typing.List[str]]:
return [carrier.attributes["googclient_" + key]]

def keys(self, carrier: gapic_types.PubsubMessage) -> typing.List[str]:
return list(carrier.attributes.keys())
16 changes: 16 additions & 0 deletions google/cloud/pubsub_v1/opentelemetry/publish_message_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from google.pubsub_v1 import types as gapic_types
from opentelemetry import trace
from dataclasses import dataclass, field

from typing import Optional


@dataclass
class PublishMessageWrapper:
"""
Wraps Pub/Sub message with additional metadata required for
Open Telemetry tracing.
"""

message: gapic_types.PubsubMessage
span: Optional[trace.Span] = field(default=None)
22 changes: 22 additions & 0 deletions google/cloud/pubsub_v1/opentelemetry/subscribe_spans_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from dataclasses import dataclass
from typing import Optional

from opentelemetry.trace.span import Span


@dataclass
class OpenTelemetryData:
"""
This class is for internal use by the library only.
Contains Open Telmetry data associated with a
google.cloud.pubsub_v1.subscriber.message.Message. Specifically it contains
the subscriber side spans associated with the message.

This is so that the subsriber side spans can be ended by the library
after receiving the message back via an ack(), ack_with_response(), nack().
"""

subscribe_span: Optional[Span] = None
concurrency_control_span: Optional[Span] = None
scheduler_span: Optional[Span] = None
process_span: Optional[Span] = None
7 changes: 5 additions & 2 deletions google/cloud/pubsub_v1/publisher/_batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types
from google.pubsub_v1 import types as gapic_types
from google.cloud.pubsub_v1.opentelemetry.publish_message_wrapper import (
PublishMessageWrapper,
)


class Batch(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -54,7 +57,7 @@ class Batch(metaclass=abc.ABCMeta):

def __len__(self):
"""Return the number of messages currently in the batch."""
return len(self.messages)
return len(self.message_wrappers)

@staticmethod
@abc.abstractmethod
Expand All @@ -68,7 +71,7 @@ def make_lock(): # pragma: NO COVER

@property
@abc.abstractmethod
def messages(self) -> Sequence["gapic_types.PubsubMessage"]: # pragma: NO COVER
def message_wrappers(self) -> Sequence["PublishMessageWrapper"]: # pragma: NO COVER
"""Return the messages currently in the batch.

Returns:
Expand Down
96 changes: 82 additions & 14 deletions google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@
import logging
import threading
import time
from datetime import datetime
import typing
from typing import Any, Callable, List, Optional, Sequence

from opentelemetry import trace

import google.api_core.exceptions
from google.api_core import gapic_v1
from google.cloud.pubsub_v1.publisher import exceptions
from google.cloud.pubsub_v1.publisher import futures
from google.cloud.pubsub_v1.opentelemetry.publish_message_wrapper import (
PublishMessageWrapper,
)
from google.cloud.pubsub_v1.publisher._batch import base
from google.pubsub_v1 import types as gapic_types

Expand Down Expand Up @@ -108,7 +114,7 @@ def __init__(
# status changed from ACCEPTING_MESSAGES to any other
# in order to avoid race conditions
self._futures: List[futures.Future] = []
self._messages: List[gapic_types.PubsubMessage] = []
self._message_wrappers: List[PublishMessageWrapper] = []
self._status = base.BatchStatus.ACCEPTING_MESSAGES

# The initial size is not zero, we need to account for the size overhead
Expand All @@ -134,9 +140,9 @@ def client(self) -> "PublisherClient":
return self._client

@property
def messages(self) -> Sequence[gapic_types.PubsubMessage]:
def message_wrappers(self) -> Sequence[PublishMessageWrapper]:
"""The messages currently in the batch."""
return self._messages
return self._message_wrappers

@property
def settings(self) -> "types.BatchSettings":
Expand Down Expand Up @@ -259,7 +265,7 @@ def _commit(self) -> None:
# https://github.com/googleapis/google-cloud-python/issues/8036

# Sanity check: If there are no messages, no-op.
if not self._messages:
if not self._message_wrappers:
_LOGGER.debug("No messages to publish, exiting commit")
self._status = base.BatchStatus.SUCCESS
return
Expand All @@ -270,18 +276,76 @@ def _commit(self) -> None:

batch_transport_succeeded = True
try:
if self._client._open_telemetry_enabled:
tracer = trace.get_tracer("com.google.cloud.pubsub.v1")
links = []
for wrapper in self._message_wrappers:
span = wrapper.span
if span and span.is_recording():
links.append(trace.Link(span.get_span_context()))
with tracer.start_as_current_span(
name=f"{self._topic} publish",
attributes={
"messaging.system": "com.google.cloud.pubsub.v1",
"messaging.destination.name": self._topic,
"gcp.project_id": self._topic.split("/")[1],
"messaging.batch.message_count": len(self._message_wrappers),
"messaging.operation": "publish",
"code.function": "_commit",
},
links=links if len(links) > 0 else None,
kind=trace.SpanKind.CLIENT,
end_on_exit=False,
) as publish_rpc_span:
ctx = publish_rpc_span.get_span_context()
for wrapper in self._message_wrappers:
span = wrapper.span
if span and span.is_recording():
span.add_link(ctx)
# Performs retries for errors defined by the retry configuration.
response = self._client._gapic_publish(
topic=self._topic,
messages=self._messages,
messages=[wrapper.message for wrapper in self._message_wrappers],
retry=self._commit_retry,
timeout=self._commit_timeout,
)
if self._client._open_telemetry_enabled:
publish_rpc_span.end()
for message_id, wrapper in zip(
response.message_ids, self._message_wrappers
):
span = wrapper.span
if span:
span.add_event(
name="publish end",
attributes={
"timestamp": str(datetime.now()),
},
)
span.set_attribute(key="messaging.message.id", value=message_id)
span.end()
except google.api_core.exceptions.GoogleAPIError as exc:
# We failed to publish, even after retries, so set the exception on
# all futures and exit.
self._status = base.BatchStatus.ERROR

if self._client._open_telemetry_enabled:
publish_rpc_span.record_exception(
exception=exc,
)
publish_rpc_span.set_status(
trace.Status(status_code=trace.StatusCode.ERROR)
)
publish_rpc_span.end()
for wrapper in self._message_wrappers:
span = wrapper.span
if span:
span.record_exception(exception=exc)
span.set_status(
trace.Status(status_code=trace.StatusCode.ERROR),
)
span.end()

batch_transport_succeeded = False
if self._batch_done_callback is not None:
# Failed to publish batch.
Expand Down Expand Up @@ -326,7 +390,8 @@ def _commit(self) -> None:
self._batch_done_callback(batch_transport_succeeded)

def publish(
self, message: gapic_types.PubsubMessage
self,
message_wrapper: PublishMessageWrapper,
) -> Optional["pubsub_v1.publisher.futures.Future"]:
"""Publish a single message.

Expand All @@ -351,12 +416,16 @@ def publish(
"""

# Coerce the type, just in case.
if not isinstance(message, gapic_types.PubsubMessage):
if not isinstance(
message_wrapper.message, gapic_types.PubsubMessage
): # pragma: NO COVER
# For performance reasons, the message should be constructed by directly
# using the raw protobuf class, and only then wrapping it into the
# higher-level PubsubMessage class.
vanilla_pb = _raw_proto_pubbsub_message(**message)
message = gapic_types.PubsubMessage.wrap(vanilla_pb)
vanilla_pb = _raw_proto_pubbsub_message(**message_wrapper.message)
message_wrapper.message = vanilla_pb.gapic_types.PubsubMessage.wrap(
vanilla_pb
)

future = None

Expand All @@ -369,7 +438,7 @@ def publish(
return None

size_increase = gapic_types.PublishRequest(
messages=[message]
messages=[message_wrapper.message]
)._pb.ByteSize()

if (self._base_request_size + size_increase) > _SERVER_PUBLISH_MAX_BYTES:
Expand All @@ -381,16 +450,15 @@ def publish(
raise exceptions.MessageTooLargeError(err_msg)

new_size = self._size + size_increase
new_count = len(self._messages) + 1
new_count = len(self._message_wrappers) + 1

size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES)
overflow = new_size > size_limit or new_count >= self.settings.max_messages

if not self._messages or not overflow:
if not self._message_wrappers or not overflow:
# Store the actual message in the batch's message queue.
self._messages.append(message)
self._message_wrappers.append(message_wrapper)
self._size = new_size

# Track the future on this batch (so that the result of the
# future can be set).
future = futures.Future()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
from google.cloud.pubsub_v1.publisher import exceptions
from google.cloud.pubsub_v1.publisher._sequencer import base as sequencer_base
from google.cloud.pubsub_v1.publisher._batch import base as batch_base
from google.pubsub_v1 import types as gapic_types
from google.cloud.pubsub_v1.opentelemetry.publish_message_wrapper import (
PublishMessageWrapper,
)

if typing.TYPE_CHECKING: # pragma: NO COVER
from google.cloud.pubsub_v1 import types
Expand Down Expand Up @@ -262,7 +264,7 @@ def _create_batch(

def publish(
self,
message: gapic_types.PubsubMessage,
message: PublishMessageWrapper,
retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
) -> futures.Future:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
from google.api_core import gapic_v1

from google.cloud.pubsub_v1.publisher._sequencer import base
from google.pubsub_v1 import types as gapic_types
from google.cloud.pubsub_v1.opentelemetry.publish_message_wrapper import (
PublishMessageWrapper,
)


if typing.TYPE_CHECKING: # pragma: NO COVER
from google.cloud.pubsub_v1.publisher import _batch
Expand Down Expand Up @@ -115,7 +118,7 @@ def _create_batch(

def publish(
self,
message: gapic_types.PubsubMessage,
message: PublishMessageWrapper,
retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
) -> "futures.Future":
Expand Down
Loading