Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import static datadog.communication.http.OkHttpUtils.msgpackRequestBodyOf;
import static datadog.communication.http.OkHttpUtils.prepareRequest;
import static datadog.communication.serialization.msgpack.MsgPackWriter.FIXARRAY;
import static datadog.trace.api.ProtocolVersion.V0_4;
import static datadog.trace.api.ProtocolVersion.V1_0;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -118,7 +118,7 @@ public DDAgentFeaturesDiscovery(
this.agentBaseUrl = agentUrl;
this.metricsEnabled = metricsEnabled;
this.ignoreAgentVersionForStats = ignoreAgentVersionForStats;
this.protocolVersion = protocolVersion != null ? protocolVersion : V0_4;
this.protocolVersion = protocolVersion != null ? protocolVersion : V1_0;
this.discoveryTimer = monitoring.newTimer("trace.agent.discovery.time");
this.discoveryState = new State();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
V1_0 | V1_ENDPOINT
}

def "null protocol version falls back to v0.4 trace endpoints"() {
def "null protocol version falls back to v1.0 trace endpoints"() {
setup:
OkHttpClient client = Mock(OkHttpClient)
DDAgentFeaturesDiscovery features =
Expand All @@ -99,9 +99,10 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {

then:
1 * client.newCall({ Request request -> request.url().toString() == "http://localhost:8125/info" }) >> { Request request -> infoResponse(request, "{}") }
1 * client.newCall({ Request request -> request.url().toString() == "http://localhost:8125/v1.0/traces" }) >> { Request request -> success(request) }
0 * client.newCall({ Request request -> request.url().toString() == "http://localhost:8125/v0.5/traces" }) >> { Request request -> success(request) }
1 * client.newCall({ Request request -> request.url().toString() == "http://localhost:8125/v0.4/traces" }) >> { Request request -> success(request) }
features.getTraceEndpoint() == V04_ENDPOINT
0 * client.newCall({ Request request -> request.url().toString() == "http://localhost:8125/v0.4/traces" }) >> { Request request -> success(request) }
features.getTraceEndpoint() == V1_ENDPOINT
0 * _
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static datadog.trace.api.DDTags.ERROR_MSG;
import static datadog.trace.api.DDTags.ERROR_STACK;
import static datadog.trace.api.DDTags.ERROR_TYPE;
import static datadog.trace.api.DDTags.SPAN_EVENTS;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER;
Expand Down Expand Up @@ -136,11 +135,15 @@ public static void applyNamingConvention(AgentSpan span) {
}
}

public static void setEventsAsTag(AgentSpan span, List<OtelSpanEvent> events) {
public static void recordSpanEvents(AgentSpan span, List<OtelSpanEvent> events) {
if (events == null || events.isEmpty()) {
return;
}
span.setTag(SPAN_EVENTS, OtelSpanEvent.toTag(events));
// Hand the structured events to the span. The V1 payload encodes them natively, while v0.x
// payloads flatten them into the JSON `events` tag at serialization time (see DDSpanContext).
for (OtelSpanEvent event : events) {
span.addSpanEvent(event);
}
}

public static void applySpanEventExceptionAttributesAsTags(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import static datadog.opentelemetry.shim.trace.OtelConventions.applyNamingConvention;
import static datadog.opentelemetry.shim.trace.OtelConventions.applyReservedAttribute;
import static datadog.opentelemetry.shim.trace.OtelConventions.applySpanEventExceptionAttributesAsTags;
import static datadog.opentelemetry.shim.trace.OtelConventions.setEventsAsTag;
import static datadog.opentelemetry.shim.trace.OtelConventions.recordSpanEvents;
import static datadog.opentelemetry.shim.trace.OtelSpanEvent.EXCEPTION_SPAN_EVENT_NAME;
import static datadog.opentelemetry.shim.trace.OtelSpanEvent.initializeExceptionAttributes;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
Expand Down Expand Up @@ -179,7 +179,7 @@ public AgentSpan asAgentSpan() {
@Override
public void onSpanFinished() {
applyNamingConvention(this.delegate);
setEventsAsTag(this.delegate, this.events);
recordSpanEvents(this.delegate, this.events);
}

private static class NoopSpan implements Span {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@

import datadog.trace.api.time.SystemTimeSource;
import datadog.trace.api.time.TimeSource;
import edu.umd.cs.findbugs.annotations.NonNull;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanEvent;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class OtelSpanEvent {
public class OtelSpanEvent implements AgentSpanEvent {
public static final String EXCEPTION_SPAN_EVENT_NAME = "exception";
public static final AttributeKey<String> EXCEPTION_MESSAGE_ATTRIBUTE_KEY =
AttributeKey.stringKey("exception.message");
Expand All @@ -26,33 +28,53 @@ public class OtelSpanEvent {
private static TimeSource timeSource = SystemTimeSource.INSTANCE;

private final String name;
private final String attributes;
private final Attributes attributes;

/** Event timestamp in nanoseconds. */
private final long timestamp;

public OtelSpanEvent(String name, Attributes attributes) {
this.name = name;
this.attributes = AttributesJsonParser.toJson(attributes);
this.timestamp = OtelSpanEvent.timeSource.getCurrentTimeNanos();
this(name, attributes, OtelSpanEvent.timeSource.getCurrentTimeNanos());
}

public OtelSpanEvent(String name, Attributes attributes, long timestamp, TimeUnit unit) {
this(name, attributes, unit.toNanos(timestamp));
}

private OtelSpanEvent(String name, Attributes attributes, long timestampNanos) {
this.name = name;
this.attributes = AttributesJsonParser.toJson(attributes);
this.timestamp = unit.toNanos(timestamp);
this.attributes = attributes;
this.timestamp = timestampNanos;
}

@NonNull
public static String toTag(List<OtelSpanEvent> events) {
StringBuilder builder = new StringBuilder("[");
for (OtelSpanEvent event : events) {
if (builder.length() > 1) {
builder.append(',');
}
builder.append(event.toJson());
@Override
public long timeNanos() {
return this.timestamp;
}

@Override
public String name() {
return this.name;
}

/**
* Exposes the event attributes as typed values for native (V1) encoding. OpenTelemetry attribute
* values are already {@link String}, {@link Boolean}, {@link Long}, {@link Double} or a {@link
* List} of those, so they are passed through unchanged.
*/
@Override
public Map<String, Object> attributes() {
if (this.attributes == null || this.attributes.isEmpty()) {
return Collections.emptyMap();
}
return builder.append(']').toString();
Map<String, Object> map = new LinkedHashMap<>(this.attributes.size());
this.attributes.forEach((key, value) -> map.put(key.getKey(), value));
return map;
}

@Override
public CharSequence toJsonTag() {
return toJson();
}

/**
Expand Down Expand Up @@ -174,8 +196,9 @@ public String toJson() {
StringBuilder builder =
new StringBuilder(
"{\"time_unix_nano\":" + this.timestamp + ",\"name\":\"" + this.name + "\"");
if (!this.attributes.isEmpty()) {
builder.append(",\"attributes\":").append(this.attributes);
String attributesJson = AttributesJsonParser.toJson(this.attributes);
if (!attributesJson.isEmpty()) {
builder.append(",\"attributes\":").append(attributesJson);
}
return builder.append('}').toString();
}
Expand All @@ -186,8 +209,8 @@ public String toString() {
+ this.timestamp
+ ", name='"
+ this.name
+ "', attributes='"
+ "', attributes="
+ this.attributes
+ "'}";
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public final class ConfigDefaults {
Arrays.asList("user.id", "session.id", "account.id");
static final boolean DEFAULT_JMX_FETCH_ENABLED = true;

static final String DEFAULT_TRACE_AGENT_PROTOCOL_VERSION = ProtocolVersion.V0_4.asConfigValue();
static final String DEFAULT_TRACE_AGENT_PROTOCOL_VERSION = ProtocolVersion.V1_0.asConfigValue();

static final boolean DEFAULT_CLIENT_IP_ENABLED = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import datadog.trace.api.ProcessTags;
import datadog.trace.api.TagMap;
import datadog.trace.api.sampling.SamplingMechanism;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanEvent;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanLink;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import datadog.trace.bootstrap.instrumentation.api.Tags;
Expand Down Expand Up @@ -89,7 +90,7 @@ public void map(List<? extends CoreSpan<?>> trace, Writable writable) {
}

CoreSpan<?> firstSpan = trace.get(0);
firstSpan.processTagsAndBaggage(spanMetadata, false, false);
firstSpan.processTagsAndBaggageWithStructuredLinks(spanMetadata);
Metadata firstSpanMeta = spanMetadata.metadata;

// encoded fields: 1..7, but skipping #5, as not required by tracers and set by the agent.
Expand Down Expand Up @@ -128,7 +129,7 @@ private void encodeSpans(Writable writable, int fieldId, List<? extends CoreSpan
Metadata meta = spanMetadata.metadata;
for (CoreSpan<?> span : spans) {
if (meta == null) {
span.processTagsAndBaggage(spanMetadata, false, false);
span.processTagsAndBaggageWithStructuredLinks(spanMetadata);
meta = spanMetadata.metadata;
}
TagMap tags = meta.getTags();
Expand Down Expand Up @@ -162,7 +163,7 @@ private void encodeSpans(Writable writable, int fieldId, List<? extends CoreSpan
// links = 11, a collection of links to other spans
encodeSpanLinks(writable, 11, meta.getSpanLinks());
// events = 12, a collection of events that occurred during this span
encodeSpanEvents(writable, 12, tags.getObject(DDTags.SPAN_EVENTS));
encodeSpanEvents(writable, 12, meta.getSpanEvents());
// env = 13, the optional string environment of this span
encodeString(writable, 13, tags.getString(Tags.ENV));
// version = 14, the optional string version of this span
Expand Down Expand Up @@ -200,50 +201,23 @@ private void encodeSpanLinks(
}
}

private void encodeSpanEvents(Writable writable, int fieldId, Object eventsObject) {
private void encodeSpanEvents(
Writable writable, int fieldId, List<? extends AgentSpanEvent> events) {
writable.writeInt(fieldId);
if (!(eventsObject instanceof List) || ((List<?>) eventsObject).isEmpty()) {
if (events == null || events.isEmpty()) {
writable.startArray(0);
return;
}

List<?> events = (List<?>) eventsObject;
int encodableCount = 0;
for (Object event : events) {
if (isEncodableSpanEvent(event)) {
encodableCount++;
}
}
writable.startArray(encodableCount);
for (Object event : events) {
if (!(event instanceof Map)) {
continue;
}
Map<?, ?> eventMap = (Map<?, ?>) event;
Long timeUnixNano = asLong(eventMap.get("time_unix_nano"));
Object nameObject = eventMap.get("name");
if (timeUnixNano == null || nameObject == null) {
continue;
}

Map<?, ?> attributes =
eventMap.get("attributes") instanceof Map ? (Map<?, ?>) eventMap.get("attributes") : null;

writable.startArray(events.size());
for (AgentSpanEvent event : events) {
writable.startMap(3);
encodeLong(writable, 1, timeUnixNano);
encodeString(writable, 2, String.valueOf(nameObject));
encodeEventAttributes(writable, 3, attributes);
encodeLong(writable, 1, event.timeNanos());
encodeString(writable, 2, event.name());
encodeEventAttributes(writable, 3, event.attributes());
}
}

private boolean isEncodableSpanEvent(Object event) {
if (!(event instanceof Map)) {
return false;
}
Map<?, ?> eventMap = (Map<?, ?>) event;
return eventMap.get("name") != null && asLong(eventMap.get("time_unix_nano")) != null;
}

private void encodeEventAttributes(Writable writable, int fieldId, Map<?, ?> attrs) {
writable.writeInt(fieldId);
if (attrs == null || attrs.isEmpty()) {
Expand Down Expand Up @@ -340,20 +314,6 @@ private boolean isIntegralNumber(Number number) {
return !(number instanceof Float || number instanceof Double);
}

private Long asLong(Object value) {
if (value instanceof Number) {
return ((Number) value).longValue();
}
if (value instanceof CharSequence) {
try {
return Long.parseLong(value.toString());
} catch (NumberFormatException ignored) {
return null;
}
}
return null;
}

private void encodeSpanAttributes(
Writable writable, int fieldId, Metadata meta, Map<String, Object> metaStruct) {
TagMap tags = meta.getTags();
Expand Down
14 changes: 12 additions & 2 deletions dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,18 @@ default boolean isKind(SpanKindFilter filter) {

void processTagsAndBaggage(MetadataConsumer consumer);

void processTagsAndBaggage(
MetadataConsumer consumer, boolean injectLinksAsTags, boolean injectBaggageAsTags);
/**
* Variant of {@link #processTagsAndBaggage(MetadataConsumer)} for protocols that serialize span
* links as first-class structured data rather than tags (currently the V1 trace payload). Span
* links are therefore NOT flattened into the {@code _dd.span_links} tag, and baggage is
* materialized as {@code baggage.*} span tags, regardless of the tracer's configured defaults.
*
* <p>The default implementation delegates to {@link #processTagsAndBaggage(MetadataConsumer)};
* implementations backed by real span configuration (e.g. {@code DDSpan}) override it.
*/
default void processTagsAndBaggageWithStructuredLinks(MetadataConsumer consumer) {
processTagsAndBaggage(consumer);
}

T setSamplingPriority(int samplingPriority, int samplingMechanism);

Expand Down
Loading
Loading