diff options
Diffstat (limited to 'policy-endpoints/src/test')
13 files changed, 116 insertions, 4 deletions
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java index bd88eec9..00111fb2 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -125,7 +126,7 @@ public class TopicTestBase { .fetchLimit(MY_FETCH_LIMIT).fetchTimeout(MY_FETCH_TIMEOUT).hostname(MY_HOST).latitude(MY_LAT) .longitude(MY_LONG).managed(true).partitionId(MY_PARTITION).partner(MY_PARTNER) .password(MY_PASS).port(MY_PORT).servers(servers).topic(MY_TOPIC) - .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).userName(MY_USERNAME) + .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).allowTracing(true).userName(MY_USERNAME) .serializationProvider(MY_SERIALIZER); } @@ -156,8 +157,8 @@ public class TopicTestBase { return BusTopicParams.builder().additionalProps(addProps).basePath(MY_BASE_PATH).clientName(MY_CLIENT_NAME) .consumerGroup(MY_CONS_GROUP).consumerInstance(MY_CONS_INST).environment(MY_ENV) - .hostname(MY_HOST).partitionId(MY_PARTITION).partner(MY_PARTNER) + .hostname(MY_HOST).partitionId(MY_PARTITION).partner(MY_PARTNER).fetchTimeout(MY_FETCH_TIMEOUT) .port(KAFKA_PORT).servers(servers).topic(MY_TOPIC) - .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(false); + .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(false).allowTracing(true); } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java index 86b32e69..a95e773d 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java @@ -3,7 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2023 Nordix Foundation. + * Modifications Copyright (C) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,21 +28,32 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.att.nsa.cambria.client.CambriaConsumer; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.apache.commons.collections4.IteratorUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; import org.onap.dmaap.mr.client.response.MRConsumerResponse; import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; @@ -60,12 +71,17 @@ public class BusConsumerTest extends TopicTestBase { private static final int SHORT_TIMEOUT_MILLIS = 10; private static final int LONG_TIMEOUT_MILLIS = 3000; + @Mock + KafkaConsumer<String, String> mockedKafkaConsumer; + @Before @Override public void setUp() { super.setUp(); + MockitoAnnotations.initMocks(this); } + @Test public void testFetchingBusConsumer() { // should not be negative @@ -337,6 +353,86 @@ public class BusConsumerTest extends TopicTestBase { } @Test + public void testFetchNoMessages() throws IOException { + KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build()); + kafkaConsumerWrapper.consumer = mockedKafkaConsumer; + + when(mockedKafkaConsumer.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + + Iterable<String> result = kafkaConsumerWrapper.fetch(); + + verify(mockedKafkaConsumer, times(1)).poll(any()); + + assertThat(result != null); + + assertThat(!result.iterator().hasNext()); + + mockedKafkaConsumer.close(); + } + + @Test + public void testFetchWithMessages() { + // Setup + KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build()); + kafkaConsumerWrapper.consumer = mockedKafkaConsumer; + + ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value"); + Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>(); + recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record)); + ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap); + + when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords); + + Iterable<String> result = kafkaConsumerWrapper.fetch(); + + verify(mockedKafkaConsumer, times(1)).poll(any()); + + verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class)); + + assertThat(result != null); + + assertThat(result.iterator().hasNext()); + + assertThat(result.iterator().next().equals("value")); + + mockedKafkaConsumer.close(); + } + + @Test + public void testFetchWithMessagesAndTraceparent() { + // Setup + KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build()); + kafkaConsumerWrapper.consumer = mockedKafkaConsumer; + + ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value"); + record.headers().add( + "traceparent", + "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".getBytes(StandardCharsets.UTF_8) + ); + + Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>(); + recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record)); + ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap); + + when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords); + + Iterable<String> result = kafkaConsumerWrapper.fetch(); + + verify(mockedKafkaConsumer, times(1)).poll(any()); + + verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class)); + + assertThat(result != null); + + assertThat(result.iterator().hasNext()); + + assertThat(result.iterator().next().equals("value")); + + mockedKafkaConsumer.close(); + } + + + @Test public void testKafkaConsumerWrapperClose() { assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException(); } diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json index 9c8d78e5..1f520456 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxyTest.json @@ -11,6 +11,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": false, "allowSelfSignedCerts" : true, "consumerGroup" : "${obj.topicSources[0].consumerGroup}", "consumerInstance" : "${obj.topicSources[0].consumerInstance}", @@ -27,6 +28,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": false, "allowSelfSignedCerts" : true, "consumerGroup" : "my-cons-group", "consumerInstance" : "my-cons-inst", @@ -52,6 +54,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": false, "allowSelfSignedCerts" : true, "topicCommInfrastructure" : "UEB", "partitionKey" : "${obj.topicSinks[0].partitionKey}" @@ -65,6 +68,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": false, "allowSelfSignedCerts" : true, "topicCommInfrastructure" : "DMAAP", "partitionKey" : "my-partition" diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.json index dd0b8924..462278a4 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.json @@ -8,6 +8,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "topicCommInfrastructure" : "NOOP" } diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.json index 8e39fddf..1f2fb55f 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.json @@ -8,6 +8,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "topicCommInfrastructure" : "NOOP", "partitionKey" : "my-partition" diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.json index 2bed3347..0f58e9b2 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.json @@ -8,6 +8,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "topicCommInfrastructure" : "DMAAP", "partitionKey" : "my-partition" diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json index 42b7a036..dc1f1f75 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json @@ -9,6 +9,7 @@ "alive": false, "locked": false, "useHttps": false, + "allowTracing": false, "topicCommInfrastructure": "KAFKA", "partitionKey": "my-partition", "additionalProps": { diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.json index e7419d88..6dda9b9e 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.json @@ -8,6 +8,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "topicCommInfrastructure" : "UEB", "partitionKey" : "my-partition" diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.json index aeb233bc..305620c8 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.json @@ -8,6 +8,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "consumerGroup" : "my-cons-group", "consumerInstance" : "my-cons-inst", diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.json index 07aa916a..a3fc8b86 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.json @@ -8,6 +8,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "consumerGroup" : "my-cons-group", "consumerInstance" : "my-cons-inst", diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json index 38cc2f8e..a101d235 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json @@ -9,6 +9,7 @@ "alive": false, "locked": false, "useHttps": false, + "allowTracing": false, "topicCommInfrastructure": "KAFKA", "additionalProps": { "security.protocol": "SASL_PLAINTEXT", diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.json index 6a6e8f52..13ee6bc6 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.json @@ -8,6 +8,7 @@ "apiKey" : "my-api-key", "apiSecret" : "my-api-secret", "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "consumerGroup" : "my-cons-group", "consumerInstance" : "my-cons-inst", diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/parameters/TopicParameters_all_params.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/parameters/TopicParameters_all_params.json index 7d9cce7b..de9487be 100644 --- a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/parameters/TopicParameters_all_params.json +++ b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/parameters/TopicParameters_all_params.json @@ -8,6 +8,7 @@ "apiSecret" : "my-api-secret", "port": 123, "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "consumerGroup" : "consumer group", "consumerInstance" : "consumer instance", @@ -37,6 +38,7 @@ "apiSecret" : "my-api-secret", "port": 123, "useHttps" : true, + "allowTracing": true, "allowSelfSignedCerts" : true, "consumerGroup" : "consumer group", "consumerInstance" : "consumer instance", |