From 0241c0aa14447c99fccecc61e91b35051d6743be Mon Sep 17 00:00:00 2001 From: "saul.gill" Date: Mon, 22 Jan 2024 11:59:07 +0000 Subject: Addition of tracing for Kafka Added open telemetry-based interceptors Messages will be tagged with tracing information Issue-ID: POLICY-4922 Change-Id: If4234a642c3eb7dd6c3acaf2f06b2efb2ddef8af Signed-off-by: saul.gill --- policy-endpoints/pom.xml | 15 ++++ .../event/comm/bus/internal/BusConsumer.java | 62 +++++++++++++- .../event/comm/bus/internal/BusPublisher.java | 6 ++ .../event/comm/bus/internal/BusTopicBase.java | 7 ++ .../event/comm/bus/internal/BusTopicParams.java | 7 ++ .../comm/bus/internal/InlineBusTopicSink.java | 1 + .../comm/bus/internal/InlineDmaapTopicSink.java | 3 + .../comm/bus/internal/InlineKafkaTopicSink.java | 1 + .../comm/bus/internal/InlineUebTopicSink.java | 2 + .../internal/SingleThreadedDmaapTopicSource.java | 3 +- .../internal/SingleThreadedKafkaTopicSource.java | 3 +- .../bus/internal/SingleThreadedUebTopicSource.java | 1 + .../endpoints/event/comm/bus/TopicTestBase.java | 7 +- .../event/comm/bus/internal/BusConsumerTest.java | 98 +++++++++++++++++++++- .../event/comm/TopicEndpointProxyTest.json | 4 + .../event/comm/bus/internal/BusTopicBaseTest.json | 1 + .../comm/bus/internal/InlineBusTopicSinkTest.json | 1 + .../bus/internal/InlineDmaapTopicSinkTest.json | 1 + .../bus/internal/InlineKafkaTopicSinkTest.json | 1 + .../comm/bus/internal/InlineUebTopicSinkTest.json | 1 + .../internal/SingleThreadedBusTopicSourceTest.json | 1 + .../SingleThreadedDmaapTopicSourceTest.json | 1 + .../SingleThreadedKafkaTopicSourceTest.json | 1 + .../internal/SingleThreadedUebTopicSourceTest.json | 1 + .../parameters/TopicParameters_all_params.json | 2 + 25 files changed, 223 insertions(+), 8 deletions(-) diff --git a/policy-endpoints/pom.xml b/policy-endpoints/pom.xml index c79e9401..4b342b9d 100644 --- a/policy-endpoints/pom.xml +++ b/policy-endpoints/pom.xml @@ -183,5 +183,20 @@ io.swagger.core.v3 swagger-annotations + + io.opentelemetry.instrumentation + opentelemetry-kafka-clients-2.6 + 1.25.0-alpha + + + io.opentelemetry + opentelemetry-exporter-otlp + 1.25.0 + + + io.opentelemetry + opentelemetry-sdk-extension-autoconfigure + 1.25.0-alpha + diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 79e374a2..d6fa0645 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -5,7 +5,7 @@ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved. - * Modifications Copyright (C) 2022-2023 Nordix Foundation. + * Modifications Copyright (C) 2022-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,18 +26,26 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import com.att.nsa.cambria.client.CambriaClientBuilders; import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; import com.att.nsa.cambria.client.CambriaConsumer; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor; import java.io.IOException; import java.net.MalformedURLException; +import java.nio.charset.StandardCharsets; import java.security.GeneralSecurityException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import lombok.Data; import lombok.Getter; +import lombok.NoArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -45,6 +53,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Headers; import org.jetbrains.annotations.NotNull; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; @@ -245,6 +254,8 @@ public interface BusConsumer { protected KafkaConsumer consumer; protected Properties kafkaProps; + protected boolean allowTracing; + /** * Kafka Consumer Wrapper. * BusTopicParam - object contains the following parameters @@ -278,6 +289,12 @@ public interface BusConsumer { if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) { kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup()); } + if (busTopicParams.isAllowTracing()) { + this.allowTracing = true; + kafkaProps.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + TracingConsumerInterceptor.class.getName()); + } + consumer = new KafkaConsumer<>(kafkaProps); //Subscribe to the topic consumer.subscribe(List.of(busTopicParams.getTopic())); @@ -291,6 +308,10 @@ public interface BusConsumer { } List messages = new ArrayList<>(records.count()); try { + if (allowTracing) { + createParentTraceContext(records); + } + for (TopicPartition partition : records.partitions()) { List> partitionRecords = records.records(partition); for (ConsumerRecord partitionRecord : partitionRecords) { @@ -307,6 +328,43 @@ public interface BusConsumer { return messages; } + private void createParentTraceContext(ConsumerRecords records) { + TraceParentInfo traceParentInfo = new TraceParentInfo(); + for (ConsumerRecord consumerRecord : records) { + + Headers consumerRecordHeaders = consumerRecord.headers(); + traceParentInfo = processTraceParentHeader(consumerRecordHeaders); + } + + SpanContext spanContext = SpanContext.createFromRemoteParent( + traceParentInfo.getTraceId(), traceParentInfo.getSpanId(), + TraceFlags.getSampled(), TraceState.builder().build()); + + Context.current().with(Span.wrap(spanContext)).makeCurrent(); + } + + private TraceParentInfo processTraceParentHeader(Headers headers) { + TraceParentInfo traceParentInfo = new TraceParentInfo(); + if (headers.lastHeader("traceparent") != null) { + traceParentInfo.setParentTraceId(new String(headers.lastHeader( + "traceparent").value(), StandardCharsets.UTF_8)); + + String[] parts = traceParentInfo.getParentTraceId().split("-"); + traceParentInfo.setTraceId(parts[1]); + traceParentInfo.setSpanId(parts[2]); + } + + return traceParentInfo; + } + + @Data + @NoArgsConstructor + private static class TraceParentInfo { + private String parentTraceId; + private String traceId; + private String spanId; + } + @Override public void close() { super.close(); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index ef8e1742..def8f841 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -26,6 +26,7 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import com.att.nsa.apiClient.http.HttpClient.ConnectionType; import com.att.nsa.cambria.client.CambriaBatchingPublisher; import com.att.nsa.cambria.client.CambriaClientBuilders; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor; import java.net.MalformedURLException; import java.security.GeneralSecurityException; import java.util.ArrayList; @@ -193,6 +194,11 @@ public interface BusPublisher { kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER); } + if (busTopicParams.isAllowTracing()) { + kafkaProps.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, + TracingProducerInterceptor.class.getName()); + } + producer = new KafkaProducer<>(kafkaProps); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java index 67ee84e5..f8236d3d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java @@ -45,6 +45,11 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { */ protected boolean useHttps; + /** + * Allow tracing. + */ + protected boolean allowTracing; + /** * allow self signed certificates. */ @@ -58,6 +63,7 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { * apiKey API Key * apiSecret API Secret * useHttps does connection use HTTPS? + * allowTracing Is tracing allowed? * allowSelfSignedCerts are self-signed certificates allow * @param busTopicParams holds all our parameters * @throws IllegalArgumentException if invalid parameters are present @@ -67,6 +73,7 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled { this.apiKey = busTopicParams.getApiKey(); this.apiSecret = busTopicParams.getApiSecret(); this.useHttps = busTopicParams.isUseHttps(); + this.allowTracing = busTopicParams.isAllowTracing(); this.allowSelfSignedCerts = busTopicParams.isAllowSelfSignedCerts(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java index d6fa21b6..7cc8f8b6 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java @@ -48,6 +48,7 @@ import org.apache.commons.lang3.StringUtils; * longitude DME2 Longitude * additionalProps Additional properties to pass to DME2 * useHttps does connection use HTTPS? + * allowTracing is message tracing allowed? * allowSelfSignedCerts are self-signed certificates allow */ @Getter @@ -66,6 +67,7 @@ public class BusTopicParams { private int fetchTimeout; private int fetchLimit; private boolean useHttps; + private boolean allowTracing; private boolean allowSelfSignedCerts; private boolean managed; @@ -243,6 +245,11 @@ public class BusTopicParams { return this; } + public TopicParamsBuilder allowTracing(boolean allowTracing) { + this.params.allowTracing = allowTracing; + return this; + } + public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) { this.params.allowSelfSignedCerts = allowSelfSignedCerts; return this; diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java index 02626d34..7c740abf 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java @@ -65,6 +65,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi * apiSecret api secret * partitionId partition id * useHttps does connection use HTTPS? + * allowTracing is tracing allowed? * allowSelfSignedCerts are self-signed certificates allow * * @throws IllegalArgumentException if invalid parameters are passed in */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java index a7a692de..771efb33 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java @@ -59,6 +59,7 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop * longitude DME2 Longitude * additionalProps Additional properties to pass to DME2 * useHttps does connection use HTTPS? + * allowTracing is tracing allowed? * allowSelfSignedCerts are self-signed certificates allow * @param busTopicParams Contains the above mentioned parameters * @throws IllegalArgumentException An invalid parameter passed in @@ -92,6 +93,7 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop .userName(this.userName) .password(this.password) .useHttps(this.useHttps) + .allowTracing(this.allowTracing) .allowSelfSignedCerts(this.allowSelfSignedCerts) .build()); } else { @@ -107,6 +109,7 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop .longitude(this.longitude) .additionalProps(this.additionalProps) .useHttps(this.useHttps) + .allowTracing(this.allowTracing) .build()); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java index f605de92..6354f762 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java @@ -63,6 +63,7 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop .servers(this.servers) .topic(this.effectiveTopic) .useHttps(this.useHttps) + .allowTracing(this.allowTracing) .additionalProps(this.additionalProps) .build()); logger.info("{}: KAFKA SINK created", this); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java index f905bd7d..896cb3bb 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java @@ -47,6 +47,7 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi * apiSecret the api secret (optional) * partitionId the partition key (optional, autogenerated if not provided) * useHttps does connection use HTTPS? + * allowTracing is tracing allowed? * allowSelfSignedCerts are self-signed certificates allow * @param busTopicParams contains attributes needed * @throws IllegalArgumentException if invalid arguments are detected @@ -67,6 +68,7 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi .apiKey(this.apiKey) .apiSecret(this.apiSecret) .useHttps(this.useHttps) + .allowTracing(this.allowTracing) .allowSelfSignedCerts(this.allowSelfSignedCerts) .build()); logger.info("{}: UEB SINK created", this); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java index 09ce5261..26960379 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -93,7 +93,8 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource .consumerInstance(this.consumerInstance) .fetchTimeout(this.fetchTimeout) .fetchLimit(this.fetchLimit) - .useHttps(this.useHttps); + .useHttps(this.useHttps) + .allowTracing(this.allowTracing); if (anyNullOrEmpty(this.userName, this.password)) { this.consumer = new BusConsumer.CambriaConsumerWrapper(builder diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java index 713b4fd1..869273f0 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java @@ -57,7 +57,8 @@ public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource .topic(this.effectiveTopic) .fetchTimeout(this.fetchTimeout) .consumerGroup(this.consumerGroup) - .useHttps(this.useHttps); + .useHttps(this.useHttps) + .allowTracing(this.allowTracing); this.consumer = new BusConsumer.KafkaConsumerWrapper(builder .additionalProps(this.additionalProps) diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java index d8703c42..ead04594 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java @@ -56,6 +56,7 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource i .fetchTimeout(this.fetchTimeout) .fetchLimit(this.fetchLimit) .useHttps(this.useHttps) + .allowTracing(this.allowTracing) .allowSelfSignedCerts(this.allowSelfSignedCerts).build()); } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java index bd88eec9..00111fb2 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -125,7 +126,7 @@ public class TopicTestBase { .fetchLimit(MY_FETCH_LIMIT).fetchTimeout(MY_FETCH_TIMEOUT).hostname(MY_HOST).latitude(MY_LAT) .longitude(MY_LONG).managed(true).partitionId(MY_PARTITION).partner(MY_PARTNER) .password(MY_PASS).port(MY_PORT).servers(servers).topic(MY_TOPIC) - .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).userName(MY_USERNAME) + .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).allowTracing(true).userName(MY_USERNAME) .serializationProvider(MY_SERIALIZER); } @@ -156,8 +157,8 @@ public class TopicTestBase { return BusTopicParams.builder().additionalProps(addProps).basePath(MY_BASE_PATH).clientName(MY_CLIENT_NAME) .consumerGroup(MY_CONS_GROUP).consumerInstance(MY_CONS_INST).environment(MY_ENV) - .hostname(MY_HOST).partitionId(MY_PARTITION).partner(MY_PARTNER) + .hostname(MY_HOST).partitionId(MY_PARTITION).partner(MY_PARTNER).fetchTimeout(MY_FETCH_TIMEOUT) .port(KAFKA_PORT).servers(servers).topic(MY_TOPIC) - .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(false); + .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(false).allowTracing(true); } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java index 86b32e69..a95e773d 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java @@ -3,7 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2023 Nordix Foundation. + * Modifications Copyright (C) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,21 +28,32 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.att.nsa.cambria.client.CambriaConsumer; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.apache.commons.collections4.IteratorUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; import org.onap.dmaap.mr.client.response.MRConsumerResponse; import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; @@ -60,12 +71,17 @@ public class BusConsumerTest extends TopicTestBase { private static final int SHORT_TIMEOUT_MILLIS = 10; private static final int LONG_TIMEOUT_MILLIS = 3000; + @Mock + KafkaConsumer mockedKafkaConsumer; + @Before @Override public void setUp() { super.setUp(); + MockitoAnnotations.initMocks(this); } + @Test public void testFetchingBusConsumer() { // should not be negative @@ -336,6 +352,86 @@ public class BusConsumerTest extends TopicTestBase { consumer.close(); } + @Test + public void testFetchNoMessages() throws IOException { + KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build()); + kafkaConsumerWrapper.consumer = mockedKafkaConsumer; + + when(mockedKafkaConsumer.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + + Iterable result = kafkaConsumerWrapper.fetch(); + + verify(mockedKafkaConsumer, times(1)).poll(any()); + + assertThat(result != null); + + assertThat(!result.iterator().hasNext()); + + mockedKafkaConsumer.close(); + } + + @Test + public void testFetchWithMessages() { + // Setup + KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build()); + kafkaConsumerWrapper.consumer = mockedKafkaConsumer; + + ConsumerRecord record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value"); + Map>> recordsMap = new HashMap<>(); + recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record)); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords); + + Iterable result = kafkaConsumerWrapper.fetch(); + + verify(mockedKafkaConsumer, times(1)).poll(any()); + + verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class)); + + assertThat(result != null); + + assertThat(result.iterator().hasNext()); + + assertThat(result.iterator().next().equals("value")); + + mockedKafkaConsumer.close(); + } + + @Test + public void testFetchWithMessagesAndTraceparent() { + // Setup + KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build()); + kafkaConsumerWrapper.consumer = mockedKafkaConsumer; + + ConsumerRecord record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value"); + record.headers().add( + "traceparent", + "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".getBytes(StandardCharsets.UTF_8) + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record)); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords); + + Iterable result = kafkaConsumerWrapper.fetch(); + + verify(mockedKafkaConsumer, times(1)).poll(any()); + + verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class)); + + assertThat(result != null); + + assertThat(result.iterator().hasNext()); + + assertThat(result.iterator().next().equals("value")); + + mockedKafkaConsumer.close(); + } + + @Test public void testKafkaConsumerWrapperClose() { assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException(); diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json index 9c8d78e5..1f520456 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json @@ -11,6 +11,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": false, "allowSelfSignedCerts" : true, "consumerGroup" : "${obj.topicSources[0].consumerGroup}", "consumerInstance" : "${obj.topicSources[0].consumerInstance}", @@ -27,6 +28,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": false, "allowSelfSignedCerts" : true, "consumerGroup" : "my-cons-group", "consumerInstance" : "my-cons-inst", @@ -52,6 +54,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": false, "allowSelfSignedCerts" : true, "topicCommInfrastructure" : "UEB", "partitionKey" : "${obj.topicSinks[0].partitionKey}" @@ -65,6 +68,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": false, "allowSelfSignedCerts" : true, "topicCommInfrastructure" : "DMAAP", "partitionKey" : "my-partition" diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.json index dd0b8924..462278a4 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.json @@ -8,6 +8,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "topicCommInfrastructure" : "NOOP" } diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.json index 8e39fddf..1f2fb55f 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.json @@ -8,6 +8,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "topicCommInfrastructure" : "NOOP", "partitionKey" : "my-partition" diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.json index 2bed3347..0f58e9b2 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.json @@ -8,6 +8,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "topicCommInfrastructure" : "DMAAP", "partitionKey" : "my-partition" diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json index 42b7a036..dc1f1f75 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json @@ -9,6 +9,7 @@ "alive": false, "locked": false, "useHttps": false, + "allowTracing": false, "topicCommInfrastructure": "KAFKA", "partitionKey": "my-partition", "additionalProps": { diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.json index e7419d88..6dda9b9e 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.json @@ -8,6 +8,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "topicCommInfrastructure" : "UEB", "partitionKey" : "my-partition" diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.json index aeb233bc..305620c8 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.json @@ -8,6 +8,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "consumerGroup" : "my-cons-group", "consumerInstance" : "my-cons-inst", diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.json index 07aa916a..a3fc8b86 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.json @@ -8,6 +8,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "consumerGroup" : "my-cons-group", "consumerInstance" : "my-cons-inst", diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json index 38cc2f8e..a101d235 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json @@ -9,6 +9,7 @@ "alive": false, "locked": false, "useHttps": false, + "allowTracing": false, "topicCommInfrastructure": "KAFKA", "additionalProps": { "security.protocol": "SASL_PLAINTEXT", diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.json index 6a6e8f52..13ee6bc6 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.json @@ -8,6 +8,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "consumerGroup" : "my-cons-group", "consumerInstance" : "my-cons-inst", diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/parameters/TopicParameters_all_params.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/parameters/TopicParameters_all_params.json index 7d9cce7b..de9487be 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/parameters/TopicParameters_all_params.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/parameters/TopicParameters_all_params.json @@ -8,6 +8,7 @@ "apiSecret" : "my-api-secret", "port": 123, "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "consumerGroup" : "consumer group", "consumerInstance" : "consumer instance", @@ -37,6 +38,7 @@ "apiSecret" : "my-api-secret", "port": 123, "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "consumerGroup" : "consumer group", "consumerInstance" : "consumer instance", -- cgit 1.2.3-korg