diff --git a/pyproject.toml b/pyproject.toml index 3c8a0214ed1f0a..fc622d5acea96f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -85,7 +85,7 @@ dependencies = [ "sentry-forked-email-reply-parser>=0.5.12.post1", "sentry-kafka-schemas>=2.1.15", "sentry-ophio>=1.1.3", - "sentry-protos>=0.4.6", + "sentry-protos>=0.4.7", "sentry-redis-tools>=0.5.0", "sentry-relay>=0.9.22", "sentry-sdk[http2]>=2.43.0", diff --git a/src/sentry/api/endpoints/organization_trace_item_attributes_ranked.py b/src/sentry/api/endpoints/organization_trace_item_attributes_ranked.py index 47e87ff8617565..5dc90397f9cf2a 100644 --- a/src/sentry/api/endpoints/organization_trace_item_attributes_ranked.py +++ b/src/sentry/api/endpoints/organization_trace_item_attributes_ranked.py @@ -5,21 +5,26 @@ from rest_framework.request import Request from rest_framework.response import Response +from sentry_protos.snuba.v1.endpoint_trace_item_attributes_pb2 import TraceItemAttributeNamesRequest from sentry_protos.snuba.v1.endpoint_trace_item_stats_pb2 import ( AttributeDistributionsRequest, StatsType, TraceItemStatsRequest, ) -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ExtrapolationMode +from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, ExtrapolationMode -from sentry import features +from sentry import features, options from sentry.api.api_owners import ApiOwner from sentry.api.api_publish_status import ApiPublishStatus from sentry.api.base import region_silo_endpoint from sentry.api.bases import NoProjects, OrganizationEventsEndpointBase +from sentry.api.endpoints.organization_trace_item_attributes import adjust_start_end_window +from sentry.api.utils import handle_query_errors from sentry.exceptions import InvalidSearchQuery from sentry.models.organization import Organization from sentry.search.eap.resolver import SearchResolver +from sentry.search.eap.spans.attributes import SPANS_STATS_EXCLUDED_ATTRIBUTES from sentry.search.eap.spans.definitions import SPAN_DEFINITIONS from sentry.search.eap.types import SearchResolverConfig, SupportedTraceItemType from sentry.search.eap.utils import can_expose_attribute, translate_internal_to_public_alias @@ -28,10 +33,13 @@ from sentry.seer.workflows.compare import keyed_rrf_score from sentry.snuba.referrer import Referrer from sentry.snuba.spans_rpc import Spans +from sentry.utils import snuba_rpc from sentry.utils.snuba_rpc import trace_item_stats_rpc logger = logging.getLogger(__name__) +PARALLELIZATION_FACTOR = 2 + @region_silo_endpoint class OrganizationTraceItemsAttributesRankedEndpoint(OrganizationEventsEndpointBase): @@ -121,75 +129,111 @@ def get(self, request: Request, organization: Organization) -> Response: return Response({"rankedAttributes": []}) cohort_1, _, _ = resolver.resolve_query(query_1) - cohort_1_request = TraceItemStatsRequest( - filter=cohort_1, - meta=meta, - stats_types=[ - StatsType( - attribute_distributions=AttributeDistributionsRequest( - max_buckets=75, - ) - ) - ], + cohort_2, _, _ = resolver.resolve_query(query_2) + + # Fetch attribute names for parallelization + adjusted_start_date, adjusted_end_date = adjust_start_end_window( + snuba_params.start_date, snuba_params.end_date + ) + attrs_snuba_params = snuba_params.copy() + attrs_snuba_params.start = adjusted_start_date + attrs_snuba_params.end = adjusted_end_date + attrs_resolver = SearchResolver( + params=attrs_snuba_params, config=resolver_config, definitions=SPAN_DEFINITIONS ) + attrs_meta = attrs_resolver.resolve_meta( + referrer=Referrer.API_SPANS_FREQUENCY_STATS_RPC.value + ) + attrs_meta.trace_item_type = TraceItemType.TRACE_ITEM_TYPE_SPAN - cohort_2, _, _ = resolver.resolve_query(query_2) - cohort_2_request = TraceItemStatsRequest( - filter=cohort_2, - meta=meta, - stats_types=[ - StatsType( - attribute_distributions=AttributeDistributionsRequest( - max_buckets=75, - ) + attr_type = AttributeKey.Type.TYPE_STRING + max_attributes = options.get("explore.trace-items.keys.max") + + with handle_query_errors(): + attrs_request = TraceItemAttributeNamesRequest( + meta=attrs_meta, + limit=max_attributes, + type=attr_type, + intersecting_attributes_filter=cohort_2, + ) + attrs_response = snuba_rpc.attribute_names_rpc(attrs_request) + + # Chunk attributes for parallel processing + chunked_attributes: defaultdict[int, list[AttributeKey]] = defaultdict(list) + for i, attr_proto in enumerate(attrs_response.attributes): + if attr_proto.name in SPANS_STATS_EXCLUDED_ATTRIBUTES: + continue + + chunked_attributes[i % PARALLELIZATION_FACTOR].append( + AttributeKey(name=attr_proto.name, type=AttributeKey.TYPE_STRING) + ) + + def run_stats_request_with_error_handling(filter, attributes): + with handle_query_errors(): + request = TraceItemStatsRequest( + filter=filter, + meta=meta, + stats_types=[ + StatsType( + attribute_distributions=AttributeDistributionsRequest( + max_buckets=75, + attributes=attributes, + ) + ) + ], + ) + return trace_item_stats_rpc(request) + + def run_table_query_with_error_handling(query_string): + with handle_query_errors(): + return Spans.run_table_query( + params=snuba_params, + query_string=query_string, + selected_columns=["count(span.duration)"], + orderby=None, + config=resolver_config, + offset=0, + limit=1, + sampling_mode=snuba_params.sampling_mode, + referrer=Referrer.API_SPAN_SAMPLE_GET_SPAN_DATA.value, ) - ], - ) with ThreadPoolExecutor( thread_name_prefix=__name__, - max_workers=4, + max_workers=PARALLELIZATION_FACTOR * 2 + 2, # 2 cohorts * threads + 2 totals queries ) as query_thread_pool: - cohort_1_future = query_thread_pool.submit( - trace_item_stats_rpc, - cohort_1_request, - ) - totals_1_future = query_thread_pool.submit( - Spans.run_table_query, - params=snuba_params, - query_string=query_1, - selected_columns=["count(span.duration)"], - orderby=None, - config=resolver_config, - offset=0, - limit=1, - sampling_mode=snuba_params.sampling_mode, - referrer=Referrer.API_SPAN_SAMPLE_GET_SPAN_DATA.value, - ) + cohort_1_futures = [ + query_thread_pool.submit( + run_stats_request_with_error_handling, cohort_1, attributes + ) + for attributes in chunked_attributes.values() + ] + cohort_2_futures = [ + query_thread_pool.submit( + run_stats_request_with_error_handling, cohort_2, attributes + ) + for attributes in chunked_attributes.values() + ] - cohort_2_future = query_thread_pool.submit( - trace_item_stats_rpc, - cohort_2_request, - ) + totals_1_future = query_thread_pool.submit(run_table_query_with_error_handling, query_1) + totals_2_future = query_thread_pool.submit(run_table_query_with_error_handling, query_2) - totals_2_future = query_thread_pool.submit( - Spans.run_table_query, - params=snuba_params, - query_string=query_2, - selected_columns=["count(span.duration)"], - orderby=None, - config=resolver_config, - offset=0, - limit=1, - sampling_mode=snuba_params.sampling_mode, - referrer=Referrer.API_SPAN_SAMPLE_GET_SPAN_DATA.value, - ) + # Merge cohort 1 results + cohort_1_data = [] + for future in cohort_1_futures: + result = future.result() + if result.results: + cohort_1_data.extend(result.results[0].attribute_distributions.attributes) - cohort_1_data = cohort_1_future.result() - cohort_2_data = cohort_2_future.result() + # Merge cohort 2 results + cohort_2_data = [] + for future in cohort_2_futures: + result = future.result() + if result.results: + cohort_2_data.extend(result.results[0].attribute_distributions.attributes) - totals_1_result = totals_1_future.result() - totals_2_result = totals_2_future.result() + totals_1_result = totals_1_future.result() + totals_2_result = totals_2_future.result() cohort_1_distribution = [] cohort_1_distribution_map = defaultdict(list) @@ -198,7 +242,7 @@ def get(self, request: Request, organization: Organization) -> Response: cohort_2_distribution_map = defaultdict(list) processed_cohort_2_buckets = set() - for attribute in cohort_2_data.results[0].attribute_distributions.attributes: + for attribute in cohort_2_data: if not can_expose_attribute(attribute.attribute_name, SupportedTraceItemType.SPANS): continue @@ -207,7 +251,7 @@ def get(self, request: Request, organization: Organization) -> Response: {"label": bucket.label, "value": bucket.value} ) - for attribute in cohort_1_data.results[0].attribute_distributions.attributes: + for attribute in cohort_1_data: if not can_expose_attribute(attribute.attribute_name, SupportedTraceItemType.SPANS): continue for bucket in attribute.buckets: @@ -287,7 +331,7 @@ def get(self, request: Request, organization: Organization) -> Response: # Create RRR order mapping from compare_distributions results # scored_attrs_rrr returns a dict with 'results' key containing list of [attribute_name, score] pairs rrr_results = scored_attrs_rrr.get("results", []) - rrr_order_map = {attr: i for i, (attr, _) in enumerate(rrr_results)} + rrr_order_map = {attr_name: i for i, (attr_name, _) in enumerate(rrr_results)} ranked_distribution: dict[str, Any] = { "rankedAttributes": [], @@ -300,7 +344,8 @@ def get(self, request: Request, organization: Organization) -> Response: "cohort2Total": total_baseline, } - for i, (attr, _) in enumerate(scored_attrs_rrf): + for i, scored_attr_tuple in enumerate(scored_attrs_rrf): + attr = scored_attr_tuple[0] public_alias, _, _ = translate_internal_to_public_alias( attr, "string", SupportedTraceItemType.SPANS diff --git a/src/sentry/api/endpoints/organization_trace_item_stats.py b/src/sentry/api/endpoints/organization_trace_item_stats.py index 47f105947aafaf..0814d9b14380b9 100644 --- a/src/sentry/api/endpoints/organization_trace_item_stats.py +++ b/src/sentry/api/endpoints/organization_trace_item_stats.py @@ -1,24 +1,37 @@ import logging +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed from rest_framework import serializers from rest_framework.request import Request from rest_framework.response import Response +from sentry_protos.snuba.v1.endpoint_trace_item_attributes_pb2 import TraceItemAttributeNamesRequest +from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey +from sentry import options from sentry.api.api_owners import ApiOwner from sentry.api.api_publish_status import ApiPublishStatus from sentry.api.base import region_silo_endpoint from sentry.api.bases import NoProjects, OrganizationEventsEndpointBase +from sentry.api.endpoints.organization_trace_item_attributes import adjust_start_end_window +from sentry.api.utils import handle_query_errors from sentry.models.organization import Organization from sentry.search.eap.constants import SUPPORTED_STATS_TYPES from sentry.search.eap.resolver import SearchResolver +from sentry.search.eap.spans.attributes import SPANS_STATS_EXCLUDED_ATTRIBUTES from sentry.search.eap.spans.definitions import SPAN_DEFINITIONS from sentry.search.eap.types import SearchResolverConfig from sentry.snuba.referrer import Referrer from sentry.snuba.spans_rpc import Spans +from sentry.utils import snuba_rpc logger = logging.getLogger(__name__) +MAX_THREADS = 4 + + class OrganizationTraceItemsStatsSerializer(serializers.Serializer): query = serializers.CharField(required=False) statsType = serializers.ListField( @@ -49,13 +62,74 @@ def get(self, request: Request, organization: Organization) -> Response: params=snuba_params, config=resolver_config, definitions=SPAN_DEFINITIONS ) - stats_results = Spans.run_stats_query( - params=snuba_params, - stats_types=serialized.get("statsType"), - query_string=serialized.get("query", ""), - referrer=Referrer.API_SPANS_FREQUENCY_STATS_RPC.value, - config=resolver_config, - search_resolver=resolver, + query_string = serialized.get("query") + query_filter, _, _ = resolver.resolve_query(query_string) + + # Fetch attribute names + adjusted_start_date, adjusted_end_date = adjust_start_end_window( + snuba_params.start_date, snuba_params.end_date + ) + attrs_snuba_params = snuba_params.copy() + attrs_snuba_params.start = adjusted_start_date + attrs_snuba_params.end = adjusted_end_date + attrs_resolver = SearchResolver( + params=attrs_snuba_params, config=resolver_config, definitions=SPAN_DEFINITIONS ) + attrs_meta = attrs_resolver.resolve_meta( + referrer=Referrer.API_SPANS_FREQUENCY_STATS_RPC.value + ) + attrs_meta.trace_item_type = TraceItemType.TRACE_ITEM_TYPE_SPAN + + attr_type = AttributeKey.Type.TYPE_STRING + max_attributes = options.get("explore.trace-items.keys.max") + + with handle_query_errors(): + attrs_request = TraceItemAttributeNamesRequest( + meta=attrs_meta, + limit=max_attributes, + type=attr_type, + intersecting_attributes_filter=query_filter, + ) + + attrs_response = snuba_rpc.attribute_names_rpc(attrs_request) + + # Chunk attributes and run stats query in parallel + chunked_attributes: defaultdict[int, list[AttributeKey]] = defaultdict(list) + for i, attr in enumerate(attrs_response.attributes): + if attr.name in SPANS_STATS_EXCLUDED_ATTRIBUTES: + continue + + chunked_attributes[i % MAX_THREADS].append( + AttributeKey(name=attr.name, type=AttributeKey.TYPE_STRING) + ) + + def run_stats_query_with_error_handling(attributes): + with handle_query_errors(): + return Spans.run_stats_query( + params=snuba_params, + stats_types=serialized.get("statsType"), + query_string=serialized.get("query", ""), + referrer=Referrer.API_SPANS_FREQUENCY_STATS_RPC.value, + config=resolver_config, + search_resolver=resolver, + attributes=attributes, + ) + + stats_results: dict[str, dict[str, dict]] = defaultdict(lambda: {"data": {}}) + with ThreadPoolExecutor( + thread_name_prefix=__name__, + max_workers=MAX_THREADS, + ) as query_thread_pool: + + futures = [ + query_thread_pool.submit(run_stats_query_with_error_handling, attributes) + for attributes in chunked_attributes.values() + ] + + for future in as_completed(futures): + result = future.result() + for stats in result: + for stats_type, data in stats.items(): + stats_results[stats_type]["data"].update(data["data"]) - return Response({"data": stats_results}) + return Response({"data": [{k: v} for k, v in stats_results.items()]}) diff --git a/src/sentry/search/eap/spans/attributes.py b/src/sentry/search/eap/spans/attributes.py index c5d089b116775d..388efcc465e2ee 100644 --- a/src/sentry/search/eap/spans/attributes.py +++ b/src/sentry/search/eap/spans/attributes.py @@ -634,6 +634,18 @@ def is_starred_segment_context_constructor(params: SnubaParams) -> VirtualColumn if definition.replacement } +# Attributes excluded from stats queries (e.g., attribute distributions) +# These are typically system-level identifiers that don't provide useful distribution insights +SPANS_STATS_EXCLUDED_ATTRIBUTES: set[str] = { + "sentry.item_id", + "sentry.trace_id", + "sentry.segment_id", + "sentry.parent_span_id", + "sentry.profile_id", + "sentry.event_id", + "sentry.group", +} + SPAN_VIRTUAL_CONTEXTS = { "device.class": VirtualColumnDefinition( diff --git a/src/sentry/snuba/spans_rpc.py b/src/sentry/snuba/spans_rpc.py index 775126d6d6a5d1..660002d875226b 100644 --- a/src/sentry/snuba/spans_rpc.py +++ b/src/sentry/snuba/spans_rpc.py @@ -12,6 +12,7 @@ TraceItemStatsRequest, ) from sentry_protos.snuba.v1.request_common_pb2 import PageToken, TraceItemType +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey from sentry import options from sentry.exceptions import InvalidSearchQuery @@ -287,6 +288,7 @@ def run_stats_query( referrer: str, config: SearchResolverConfig, search_resolver: SearchResolver | None = None, + attributes: list[AttributeKey] | None = None, ) -> list[dict[str, Any]]: search_resolver = search_resolver or cls.get_resolver(params, config) stats_filter, _, _ = search_resolver.resolve_query(query_string) @@ -308,6 +310,7 @@ def run_stats_query( StatsType( attribute_distributions=AttributeDistributionsRequest( max_buckets=75, + attributes=attributes, ) ) ) @@ -319,7 +322,7 @@ def run_stats_query( if "attributeDistributions" in stats_types and result.HasField( "attribute_distributions" ): - attributes = defaultdict(list) + attrs = defaultdict(list) for attribute in result.attribute_distributions.attributes: if not can_expose_attribute( attribute.attribute_name, SupportedTraceItemType.SPANS @@ -327,9 +330,9 @@ def run_stats_query( continue for bucket in attribute.buckets: - attributes[attribute.attribute_name].append( + attrs[attribute.attribute_name].append( {"label": bucket.label, "value": bucket.value} ) - stats.append({"attribute_distributions": {"data": attributes}}) + stats.append({"attribute_distributions": {"data": attrs}}) return stats diff --git a/uv.lock b/uv.lock index 0c21ea09561d09..9a4631f17ae283 100644 --- a/uv.lock +++ b/uv.lock @@ -2196,7 +2196,7 @@ requires-dist = [ { name = "sentry-forked-email-reply-parser", specifier = ">=0.5.12.post1" }, { name = "sentry-kafka-schemas", specifier = ">=2.1.15" }, { name = "sentry-ophio", specifier = ">=1.1.3" }, - { name = "sentry-protos", specifier = ">=0.4.6" }, + { name = "sentry-protos", specifier = ">=0.4.7" }, { name = "sentry-redis-tools", specifier = ">=0.5.0" }, { name = "sentry-relay", specifier = ">=0.9.22" }, { name = "sentry-sdk", extras = ["http2"], specifier = ">=2.43.0" }, @@ -2401,7 +2401,7 @@ wheels = [ [[package]] name = "sentry-protos" -version = "0.4.6" +version = "0.4.8" source = { registry = "https://pypi.devinfra.sentry.io/simple" } dependencies = [ { name = "grpc-stubs", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -2409,7 +2409,7 @@ dependencies = [ { name = "protobuf", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/sentry_protos-0.4.6-py3-none-any.whl", hash = "sha256:40775ce4c889bbc85b2c5486ee94f246adfa1f14b77212253256bbd28c076ee1" }, + { url = "https://pypi.devinfra.sentry.io/wheels/sentry_protos-0.4.8-py3-none-any.whl", hash = "sha256:c189335d346bafabe7a96e9f5086a05a179c36261b55d942fcac58bac1e46bd7" }, ] [[package]]