From 0241c0aa14447c99fccecc61e91b35051d6743be Mon Sep 17 00:00:00 2001 From: "saul.gill" Date: Mon, 22 Jan 2024 11:59:07 +0000 Subject: Addition of tracing for Kafka Added open telemetry-based interceptors Messages will be tagged with tracing information Issue-ID: POLICY-4922 Change-Id: If4234a642c3eb7dd6c3acaf2f06b2efb2ddef8af Signed-off-by: saul.gill --- .../endpoints/event/comm/bus/TopicTestBase.java | 7 +- .../event/comm/bus/internal/BusConsumerTest.java | 98 +++++++++++++++++++++- 2 files changed, 101 insertions(+), 4 deletions(-) (limited to 'policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm') diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java index bd88eec9..00111fb2 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -125,7 +126,7 @@ public class TopicTestBase { .fetchLimit(MY_FETCH_LIMIT).fetchTimeout(MY_FETCH_TIMEOUT).hostname(MY_HOST).latitude(MY_LAT) .longitude(MY_LONG).managed(true).partitionId(MY_PARTITION).partner(MY_PARTNER) .password(MY_PASS).port(MY_PORT).servers(servers).topic(MY_TOPIC) - .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).userName(MY_USERNAME) + .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).allowTracing(true).userName(MY_USERNAME) .serializationProvider(MY_SERIALIZER); } @@ -156,8 +157,8 @@ public class TopicTestBase { return BusTopicParams.builder().additionalProps(addProps).basePath(MY_BASE_PATH).clientName(MY_CLIENT_NAME) .consumerGroup(MY_CONS_GROUP).consumerInstance(MY_CONS_INST).environment(MY_ENV) - .hostname(MY_HOST).partitionId(MY_PARTITION).partner(MY_PARTNER) + .hostname(MY_HOST).partitionId(MY_PARTITION).partner(MY_PARTNER).fetchTimeout(MY_FETCH_TIMEOUT) .port(KAFKA_PORT).servers(servers).topic(MY_TOPIC) - .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(false); + .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(false).allowTracing(true); } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java index 86b32e69..a95e773d 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java @@ -3,7 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2023 Nordix Foundation. + * Modifications Copyright (C) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,21 +28,32 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.att.nsa.cambria.client.CambriaConsumer; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.apache.commons.collections4.IteratorUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; import org.onap.dmaap.mr.client.response.MRConsumerResponse; import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; @@ -60,12 +71,17 @@ public class BusConsumerTest extends TopicTestBase { private static final int SHORT_TIMEOUT_MILLIS = 10; private static final int LONG_TIMEOUT_MILLIS = 3000; + @Mock + KafkaConsumer mockedKafkaConsumer; + @Before @Override public void setUp() { super.setUp(); + MockitoAnnotations.initMocks(this); } + @Test public void testFetchingBusConsumer() { // should not be negative @@ -336,6 +352,86 @@ public class BusConsumerTest extends TopicTestBase { consumer.close(); } + @Test + public void testFetchNoMessages() throws IOException { + KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build()); + kafkaConsumerWrapper.consumer = mockedKafkaConsumer; + + when(mockedKafkaConsumer.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + + Iterable result = kafkaConsumerWrapper.fetch(); + + verify(mockedKafkaConsumer, times(1)).poll(any()); + + assertThat(result != null); + + assertThat(!result.iterator().hasNext()); + + mockedKafkaConsumer.close(); + } + + @Test + public void testFetchWithMessages() { + // Setup + KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build()); + kafkaConsumerWrapper.consumer = mockedKafkaConsumer; + + ConsumerRecord record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value"); + Map>> recordsMap = new HashMap<>(); + recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record)); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords); + + Iterable result = kafkaConsumerWrapper.fetch(); + + verify(mockedKafkaConsumer, times(1)).poll(any()); + + verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class)); + + assertThat(result != null); + + assertThat(result.iterator().hasNext()); + + assertThat(result.iterator().next().equals("value")); + + mockedKafkaConsumer.close(); + } + + @Test + public void testFetchWithMessagesAndTraceparent() { + // Setup + KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build()); + kafkaConsumerWrapper.consumer = mockedKafkaConsumer; + + ConsumerRecord record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value"); + record.headers().add( + "traceparent", + "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".getBytes(StandardCharsets.UTF_8) + ); + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record)); + ConsumerRecords consumerRecords = new ConsumerRecords<>(recordsMap); + + when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords); + + Iterable result = kafkaConsumerWrapper.fetch(); + + verify(mockedKafkaConsumer, times(1)).poll(any()); + + verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class)); + + assertThat(result != null); + + assertThat(result.iterator().hasNext()); + + assertThat(result.iterator().next().equals("value")); + + mockedKafkaConsumer.close(); + } + + @Test public void testKafkaConsumerWrapperClose() { assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException(); -- cgit 1.2.3-korg