From 0241c0aa14447c99fccecc61e91b35051d6743be Mon Sep 17 00:00:00 2001 From: "saul.gill" Date: Mon, 22 Jan 2024 11:59:07 +0000 Subject: Addition of tracing for Kafka Added open telemetry-based interceptors Messages will be tagged with tracing information Issue-ID: POLICY-4922 Change-Id: If4234a642c3eb7dd6c3acaf2f06b2efb2ddef8af Signed-off-by: saul.gill --- .../event/comm/bus/internal/BusConsumer.java | 62 +++++++++++++++++++++- .../event/comm/bus/internal/BusPublisher.java | 6 +++ .../event/comm/bus/internal/BusTopicBase.java | 7 +++ .../event/comm/bus/internal/BusTopicParams.java | 7 +++ .../comm/bus/internal/InlineBusTopicSink.java | 1 + .../comm/bus/internal/InlineDmaapTopicSink.java | 3 ++ .../comm/bus/internal/InlineKafkaTopicSink.java | 1 + .../comm/bus/internal/InlineUebTopicSink.java | 2 + .../internal/SingleThreadedDmaapTopicSource.java | 3 +- .../internal/SingleThreadedKafkaTopicSource.java | 3 +- .../bus/internal/SingleThreadedUebTopicSource.java | 1 + 11 files changed, 92 insertions(+), 4 deletions(-) (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm') diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 79e374a2..d6fa0645 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -5,7 +5,7 @@ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved. - * Modifications Copyright (C) 2022-2023 Nordix Foundation. + * Modifications Copyright (C) 2022-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,18 +26,26 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import com.att.nsa.cambria.client.CambriaClientBuilders; import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; import com.att.nsa.cambria.client.CambriaConsumer; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor; import java.io.IOException; import java.net.MalformedURLException; +import java.nio.charset.StandardCharsets; import java.security.GeneralSecurityException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import lombok.Data; import lombok.Getter; +import lombok.NoArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -45,6 +53,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; import org.jetbrains.annotations.NotNull; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; @@ -245,6 +254,8 @@ public interface BusConsumer { protected KafkaConsumer consumer; protected Properties kafkaProps; + protected boolean allowTracing; + /** * Kafka Consumer Wrapper. * BusTopicParam - object contains the following parameters @@ -278,6 +289,12 @@ public interface BusConsumer { if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) { kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup()); } + if (busTopicParams.isAllowTracing()) { + this.allowTracing = true; + kafkaProps.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + TracingConsumerInterceptor.class.getName()); + } + consumer = new KafkaConsumer<>(kafkaProps); //Subscribe to the topic consumer.subscribe(List.of(busTopicParams.getTopic())); @@ -291,6 +308,10 @@ public interface BusConsumer { } List messages = new ArrayList<>(records.count()); try { + if (allowTracing) { + createParentTraceContext(records); + } + for (TopicPartition partition : records.partitions()) { List> partitionRecords = records.records(partition); for (ConsumerRecord partitionRecord : partitionRecords) { @@ -307,6 +328,43 @@ public interface BusConsumer { return messages; } + private void createParentTraceContext(ConsumerRecords records) { + TraceParentInfo traceParentInfo = new TraceParentInfo(); + for (ConsumerRecord consumerRecord : records) { + + Headers consumerRecordHeaders = consumerRecord.headers(); + traceParentInfo = processTraceParentHeader(consumerRecordHeaders); + } + + SpanContext spanContext = SpanContext.createFromRemoteParent( + traceParentInfo.getTraceId(), traceParentInfo.getSpanId(), + TraceFlags.getSampled(), TraceState.builder().build()); + + Context.current().with(Span.wrap(spanContext)).makeCurrent(); + } + + private TraceParentInfo processTraceParentHeader(Headers headers) { + TraceParentInfo traceParentInfo = new TraceParentInfo(); + if (headers.lastHeader("traceparent") != null) { + traceParentInfo.setParentTraceId(new String(headers.lastHeader( + "traceparent").value(), StandardCharsets.UTF_8)); + + String[] parts = traceParentInfo.getParentTraceId().split("-"); + traceParentInfo.setTraceId(parts[1]); + traceParentInfo.setSpanId(parts[2]); + } + + return traceParentInfo; + } + + @Data + @NoArgsConstructor + private static class TraceParentInfo { + private String parentTraceId; + private String traceId; + private String spanId; + } + @Override public void close() { super.close(); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index ef8e1742..def8f841 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -26,6 +26,7 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import com.att.nsa.apiClient.http.HttpClient.ConnectionType; import com.att.nsa.cambria.client.CambriaBatchingPublisher; import com.att.nsa.cambria.client.CambriaClientBuilders; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor; import java.net.MalformedURLException; import java.security.GeneralSecurityException; import java.util.ArrayList; @@ -193,6 +194,11 @@ public interface BusPublisher { kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); } + if (busTopicParams.isAllowTracing()) { + kafkaProps.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, + TracingProducerInterceptor.class.getName()); + } + producer = new KafkaProducer<>(kafkaProps); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java index 67ee84e5..f8236d3d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java @@ -45,6 +45,11 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { */ protected boolean useHttps; + /** + * Allow tracing. + */ + protected boolean allowTracing; + /** * allow self signed certificates. */ @@ -58,6 +63,7 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { * apiKey API Key * apiSecret API Secret * useHttps does connection use HTTPS? + * allowTracing Is tracing allowed? * allowSelfSignedCerts are self-signed certificates allow * @param busTopicParams holds all our parameters * @throws IllegalArgumentException if invalid parameters are present @@ -67,6 +73,7 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { this.apiKey = busTopicParams.getApiKey(); this.apiSecret = busTopicParams.getApiSecret(); this.useHttps = busTopicParams.isUseHttps(); + this.allowTracing = busTopicParams.isAllowTracing(); this.allowSelfSignedCerts = busTopicParams.isAllowSelfSignedCerts(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java index d6fa21b6..7cc8f8b6 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java @@ -48,6 +48,7 @@ import org.apache.commons.lang3.StringUtils; * longitude DME2 Longitude * additionalProps Additional properties to pass to DME2 * useHttps does connection use HTTPS? + * allowTracing is message tracing allowed? * allowSelfSignedCerts are self-signed certificates allow */ @Getter @@ -66,6 +67,7 @@ public class BusTopicParams { private int fetchTimeout; private int fetchLimit; private boolean useHttps; + private boolean allowTracing; private boolean allowSelfSignedCerts; private boolean managed; @@ -243,6 +245,11 @@ public class BusTopicParams { return this; } + public TopicParamsBuilder allowTracing(boolean allowTracing) { + this.params.allowTracing = allowTracing; + return this; + } + public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) { this.params.allowSelfSignedCerts = allowSelfSignedCerts; return this; diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java index 02626d34..7c740abf 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java @@ -65,6 +65,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi * apiSecret api secret * partitionId partition id * useHttps does connection use HTTPS? + * allowTracing is tracing allowed? * allowSelfSignedCerts are self-signed certificates allow * * @throws IllegalArgumentException if invalid parameters are passed in */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java index a7a692de..771efb33 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java @@ -59,6 +59,7 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop * longitude DME2 Longitude * additionalProps Additional properties to pass to DME2 * useHttps does connection use HTTPS? + * allowTracing is tracing allowed? * allowSelfSignedCerts are self-signed certificates allow * @param busTopicParams Contains the above mentioned parameters * @throws IllegalArgumentException An invalid parameter passed in @@ -92,6 +93,7 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop .userName(this.userName) .password(this.password) .useHttps(this.useHttps) + .allowTracing(this.allowTracing) .allowSelfSignedCerts(this.allowSelfSignedCerts) .build()); } else { @@ -107,6 +109,7 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop .longitude(this.longitude) .additionalProps(this.additionalProps) .useHttps(this.useHttps) + .allowTracing(this.allowTracing) .build()); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java index f605de92..6354f762 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java @@ -63,6 +63,7 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop .servers(this.servers) .topic(this.effectiveTopic) .useHttps(this.useHttps) + .allowTracing(this.allowTracing) .additionalProps(this.additionalProps) .build()); logger.info("{}: KAFKA SINK created", this); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java index f905bd7d..896cb3bb 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java @@ -47,6 +47,7 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi * apiSecret the api secret (optional) * partitionId the partition key (optional, autogenerated if not provided) * useHttps does connection use HTTPS? + * allowTracing is tracing allowed? * allowSelfSignedCerts are self-signed certificates allow * @param busTopicParams contains attributes needed * @throws IllegalArgumentException if invalid arguments are detected @@ -67,6 +68,7 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi .apiKey(this.apiKey) .apiSecret(this.apiSecret) .useHttps(this.useHttps) + .allowTracing(this.allowTracing) .allowSelfSignedCerts(this.allowSelfSignedCerts) .build()); logger.info("{}: UEB SINK created", this); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java index 09ce5261..26960379 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -93,7 +93,8 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource .consumerInstance(this.consumerInstance) .fetchTimeout(this.fetchTimeout) .fetchLimit(this.fetchLimit) - .useHttps(this.useHttps); + .useHttps(this.useHttps) + .allowTracing(this.allowTracing); if (anyNullOrEmpty(this.userName, this.password)) { this.consumer = new BusConsumer.CambriaConsumerWrapper(builder diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java index 713b4fd1..869273f0 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java @@ -57,7 +57,8 @@ public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource .topic(this.effectiveTopic) .fetchTimeout(this.fetchTimeout) .consumerGroup(this.consumerGroup) - .useHttps(this.useHttps); + .useHttps(this.useHttps) + .allowTracing(this.allowTracing); this.consumer = new BusConsumer.KafkaConsumerWrapper(builder .additionalProps(this.additionalProps) diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java index d8703c42..ead04594 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java @@ -56,6 +56,7 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource i .fetchTimeout(this.fetchTimeout) .fetchLimit(this.fetchLimit) .useHttps(this.useHttps) + .allowTracing(this.allowTracing) .allowSelfSignedCerts(this.allowSelfSignedCerts).build()); } -- cgit 1.2.3-korg