aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal')
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java346
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java218
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java5
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java41
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java17
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.java81
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java (renamed from policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.java)22
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java52
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.java98
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java (renamed from policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.java)17
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java19
11 files changed, 295 insertions, 621 deletions
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
index 0255c100..2c33a257 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
@@ -2,7 +2,8 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,218 +21,257 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.att.nsa.cambria.client.CambriaConsumer;
import java.io.IOException;
-import java.util.Arrays;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
-import org.apache.commons.collections4.IteratorUtils;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
-import org.powermock.reflect.Whitebox;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
public class BusConsumerTest extends TopicTestBase {
+ private static final int SHORT_TIMEOUT_MILLIS = 10;
+ private static final int LONG_TIMEOUT_MILLIS = 3000;
+
+ @Mock
+ KafkaConsumer<String, String> mockedKafkaConsumer;
+
+ AutoCloseable closeable;
+
@Before
@Override
public void setUp() {
super.setUp();
+ closeable = MockitoAnnotations.openMocks(this);
}
- @Test
- public void testCambriaConsumerWrapper() {
- // verify that different wrappers can be built
- new CambriaConsumerWrapper(makeBuilder().build());
- new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
- new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
- new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
- new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
- new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
- new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
- new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
- new CambriaConsumerWrapper(makeBuilder().userName(null).build());
- new CambriaConsumerWrapper(makeBuilder().password(null).build());
- new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build());
+ @After
+ public void tearDown() throws Exception {
+ closeable.close();
}
- @Test
- public void testCambriaConsumerWrapperFetch() throws Exception {
- CambriaConsumer inner = mock(CambriaConsumer.class);
- List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
- when(inner.fetch()).thenReturn(lst);
-
- CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
- Whitebox.setInternalState(cons, "consumer", inner);
-
- assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
- // arrange to throw exception next time fetch is called
- IOException ex = new IOException(EXPECTED);
- when(inner.fetch()).thenThrow(ex);
-
- cons.fetchTimeout = 10;
-
- try {
- cons.fetch();
- fail("missing exception");
-
- } catch (IOException e) {
- assertEquals(ex, e);
- }
+ @Test
+ public void testFetchingBusConsumer() {
+ // should not be negative
+ var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
+ assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
+
+ // should not be zero
+ cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
+ assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
+
+ // should not be too large
+ cons = new FetchingBusConsumerImpl(
+ makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
+ assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
+
+ // should not be what was specified
+ cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
+ assertThat(cons.getSleepTime()).isEqualTo(100);
}
@Test
- public void testCambriaConsumerWrapperClose() {
- CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
-
- // set filter several times to cause different branches of close() to be executed
- for (int count = 0; count < 3; ++count) {
- cons.close();
- cons.setFilter("close=" + count);
- }
+ public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
+
+ var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
+
+ private CountDownLatch started = new CountDownLatch(1);
+
+ @Override
+ protected void sleepAfterFetchFailure() {
+ started.countDown();
+ super.sleepAfterFetchFailure();
+ }
+ };
+
+ // full sleep
+ long tstart = System.currentTimeMillis();
+ cons.sleepAfterFetchFailure();
+ assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
+
+ // close while sleeping - sleep should halt prematurely
+ cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
+ cons.started = new CountDownLatch(1);
+ Thread thread = new Thread(cons::sleepAfterFetchFailure);
+ tstart = System.currentTimeMillis();
+ thread.start();
+ cons.started.await();
+ cons.close();
+ thread.join();
+ assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
+
+ // interrupt while sleeping - sleep should halt prematurely
+ cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
+ cons.started = new CountDownLatch(1);
+ thread = new Thread(cons::sleepAfterFetchFailure);
+ tstart = System.currentTimeMillis();
+ thread.start();
+ cons.started.await();
+ thread.interrupt();
+ thread.join();
+ assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
}
@Test
- public void testCambriaConsumerWrapperSetFilter() {
- // set filter several times to cause different branches to be executed
- CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
- for (int count = 0; count < 3; ++count) {
- cons.setFilter("set-filter=" + count);
- }
+ public void testKafkaConsumerWrapper() {
+ // verify that different wrappers can be built
+ assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
}
- @Test
- public void testCambriaConsumerWrapperToString() {
- assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
+ @Test(expected = IllegalArgumentException.class)
+ public void testKafkaConsumerWrapper_InvalidTopic() {
+ new KafkaConsumerWrapper(makeBuilder().topic(null).build());
}
@Test
- public void testDmaapConsumerWrapper() throws Exception {
- // verify that different wrappers can be built
- new DmaapAafConsumerWrapper(makeBuilder().build());
+ public void testKafkaConsumerWrapperFetch() {
+
+ //Setup Properties for consumer
+ Properties kafkaProps = new Properties();
+ kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
+ kafkaProps.setProperty("enable.auto.commit", "true");
+ kafkaProps.setProperty("auto.commit.interval.ms", "1000");
+ kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+ KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
+ kafka.consumer = consumer;
+
+ assertThrows(java.lang.IllegalStateException.class, () -> kafka.fetch().iterator().hasNext());
+ consumer.close();
}
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
- new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
+ @Test
+ public void testFetchNoMessages() throws IOException {
+ KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
+ kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
+
+ when(mockedKafkaConsumer.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
+
+ Iterable<String> result = kafkaConsumerWrapper.fetch();
+
+ verify(mockedKafkaConsumer, times(1)).poll(any());
+
+ assertThat(result != null);
+
+ assertThat(!result.iterator().hasNext());
+
+ mockedKafkaConsumer.close();
}
@Test
- public void testDmaapConsumerWrapperFetch() throws Exception {
- DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
- MRConsumerImpl cons = mock(MRConsumerImpl.class);
+ public void testFetchWithMessages() {
+ // Setup
+ KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
+ kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
- dmaap.fetchTimeout = 5;
- dmaap.consumer = cons;
+ ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
+ recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record));
+ ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
- // null return
- when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
- assertFalse(dmaap.fetch().iterator().hasNext());
+ when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
- // with messages, 200
- List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
- MRConsumerResponse resp = new MRConsumerResponse();
- resp.setResponseCode("200");
- resp.setActualMessages(lst);
- when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
+ Iterable<String> result = kafkaConsumerWrapper.fetch();
- assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
+ verify(mockedKafkaConsumer, times(1)).poll(any());
- // null messages
- resp.setActualMessages(null);
- when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
+ verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
- assertFalse(dmaap.fetch().iterator().hasNext());
+ assertThat(result != null);
- // with messages, NOT 200
- resp.setResponseCode("400");
- resp.setActualMessages(lst);
- when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
+ assertThat(result.iterator().hasNext());
- assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
- }
+ assertThat(result.iterator().next().equals("value"));
- @Test
- public void testDmaapConsumerWrapperClose() throws Exception {
- new DmaapAafConsumerWrapper(makeBuilder().build()).close();
+ mockedKafkaConsumer.close();
}
@Test
- public void testDmaapConsumerWrapperToString() throws Exception {
- assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
- }
+ public void testFetchWithMessagesAndTraceparent() {
+ // Setup
+ KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
+ kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
- @Test
- public void testDmaapAafConsumerWrapper() throws Exception {
- // verify that different wrappers can be built
- new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
- new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build());
- }
+ ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
+ record.headers().add(
+ "traceparent",
+ "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".getBytes(StandardCharsets.UTF_8)
+ );
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
- /*
- * Unfortunately, the MR code intercepts this and throws an exception before the
- * wrapper gets a chance to check it, thus this test does not improve the coverage
- * for the constructor.
- */
- new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
- }
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
+ recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record));
+ ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
- @Test
- public void testDmaapAafConsumerWrapperToString() throws Exception {
- assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
- }
+ when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
- @Test
- public void testDmaapDmeConsumerWrapper() throws Exception {
- // verify that different wrappers can be built
- new DmaapDmeConsumerWrapper(makeBuilder().build());
- new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
- new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
- new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
-
- addProps.put(ROUTE_PROP, MY_ROUTE);
- new DmaapDmeConsumerWrapper(makeBuilder().build());
- new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
- }
+ Iterable<String> result = kafkaConsumerWrapper.fetch();
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
- new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
- }
+ verify(mockedKafkaConsumer, times(1)).poll(any());
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
- new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
+ verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
+
+ assertThat(result != null);
+
+ assertThat(result.iterator().hasNext());
+
+ assertThat(result.iterator().next().equals("value"));
+
+ mockedKafkaConsumer.close();
}
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
- new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
+
+ @Test
+ public void testKafkaConsumerWrapperClose() {
+ assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
}
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
- new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
+ @Test
+ public void testKafkaConsumerWrapperToString() {
+ assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
}
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
- new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
+ private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
+
+ protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
+ super(busTopicParams);
+ }
+
+ @Override
+ public Iterable<String> fetch() {
+ return null;
+ }
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java
deleted file mode 100644
index 5a933e9b..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
-import org.onap.dmaap.mr.client.response.MRPublisherResponse;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
-import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.CambriaPublisherWrapper;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.DmaapAafPublisherWrapper;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.DmaapDmePublisherWrapper;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.DmaapPublisherWrapper;
-
-public class BusPublisherTest extends TopicTestBase {
-
- @Before
- @Override
- public void setUp() {
- super.setUp();
- }
-
- @Test
- public void testCambriaPublisherWrapper() {
- // verify that different wrappers can be built
- new CambriaPublisherWrapper(makeBuilder().build());
- new CambriaPublisherWrapper(makeBuilder().useHttps(false).build());
- new CambriaPublisherWrapper(makeBuilder().useHttps(true).build());
- new CambriaPublisherWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
- new CambriaPublisherWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
- new CambriaPublisherWrapper(makeBuilder().apiKey(null).build());
- new CambriaPublisherWrapper(makeBuilder().apiSecret(null).build());
- new CambriaPublisherWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
- new CambriaPublisherWrapper(makeBuilder().userName(null).build());
- new CambriaPublisherWrapper(makeBuilder().password(null).build());
- new CambriaPublisherWrapper(makeBuilder().userName(null).password(null).build());
- }
-
- @Test
- public void testCambriaPublisherWrapperSend() throws Exception {
- CambriaBatchingPublisher pub = mock(CambriaBatchingPublisher.class);
- CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build());
- cambria.publisher = pub;
-
- assertTrue(cambria.send(MY_PARTITION, MY_MESSAGE));
-
- // publisher exception
- when(pub.send(anyString(), anyString())).thenThrow(new IOException(EXPECTED));
- assertFalse(cambria.send(MY_PARTITION2, MY_MESSAGE2));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testCambriaPublisherWrapperSend_InvalidMsg() {
- CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build());
- cambria.publisher = mock(CambriaBatchingPublisher.class);
-
- cambria.send(MY_PARTITION, null);
- }
-
- @Test
- public void testCambriaPublisherWrapperClose() {
- CambriaBatchingPublisher pub = mock(CambriaBatchingPublisher.class);
- CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build());
- cambria.publisher = pub;
-
- cambria.close();
- verify(pub).close();
-
- // try again, this time with an exception
- doThrow(new RuntimeException(EXPECTED)).when(pub).close();
- cambria.close();
- }
-
- @Test
- public void testDmaapPublisherWrapper() {
- // verify with different constructor arguments
- new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASS, true);
- new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASS, false);
- new DmaapPublisherWrapper(ProtocolTypeConstants.DME2, servers, MY_TOPIC, MY_USERNAME, MY_PASS, true) {};
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapPublisherWrapper_InvalidTopic() {
- new DmaapPublisherWrapper(ProtocolTypeConstants.DME2, servers, "", MY_USERNAME, MY_PASS, true) {};
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapPublisherWrapper_Aaf_NullServers() {
- new DmaapAafPublisherWrapper(null, MY_TOPIC, MY_USERNAME, MY_PASS, true);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapPublisherWrapper_Aaf_NoServers() {
- new DmaapAafPublisherWrapper(Collections.emptyList(), MY_TOPIC, MY_USERNAME, MY_PASS, true);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapPublisherWrapper_InvalidProtocol() {
- new DmaapPublisherWrapper(ProtocolTypeConstants.HTTPNOAUTH, servers, MY_TOPIC, MY_USERNAME, MY_PASS, true) {};
- }
-
- @Test
- public void testDmaapPublisherWrapperClose() throws Exception {
- MRSimplerBatchPublisher pub = mock(MRSimplerBatchPublisher.class);
- DmaapPublisherWrapper dmaap = new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASS, true);
- dmaap.publisher = pub;
-
- dmaap.close();
- verify(pub).close(anyLong(), any(TimeUnit.class));
-
- // close, but with exception from publisher
- doThrow(new IOException(EXPECTED)).when(pub).close(anyLong(), any(TimeUnit.class));
- dmaap.close();
- }
-
- @Test
- public void testDmaapPublisherWrapperSend() {
- MRSimplerBatchPublisher pub = mock(MRSimplerBatchPublisher.class);
- DmaapPublisherWrapper dmaap = new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASS, true);
- dmaap.publisher = pub;
-
- // null response
- assertTrue(dmaap.send(MY_PARTITION, MY_MESSAGE));
- verify(pub).setPubResponse(any(MRPublisherResponse.class));
- verify(pub).send(MY_PARTITION, MY_MESSAGE);
-
- // with response
- pub = mock(MRSimplerBatchPublisher.class);
- dmaap.publisher = pub;
-
- MRPublisherResponse resp = new MRPublisherResponse();
- when(pub.sendBatchWithResponse()).thenReturn(resp);
- assertTrue(dmaap.send(MY_PARTITION, MY_MESSAGE));
- verify(pub).setPubResponse(any(MRPublisherResponse.class));
- verify(pub).send(MY_PARTITION, MY_MESSAGE);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapPublisherWrapperSend_NullMessage() {
- MRSimplerBatchPublisher pub = mock(MRSimplerBatchPublisher.class);
- DmaapPublisherWrapper dmaap = new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASS, true);
- dmaap.publisher = pub;
-
- dmaap.send(MY_PARTITION, null);
- }
-
- @Test
- public void testDmaapDmePublisherWrapper() {
- // verify with different parameters
- new DmaapDmePublisherWrapper(makeBuilder().build());
- new DmaapDmePublisherWrapper(makeBuilder().additionalProps(null).build());
-
- addProps.put(ROUTE_PROP, MY_ROUTE);
- new DmaapDmePublisherWrapper(makeBuilder().build());
- new DmaapDmePublisherWrapper(makeBuilder().partner(null).build());
-
- addProps.put("null-value", null);
- new DmaapDmePublisherWrapper(makeBuilder().build());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmePublisherWrapper_InvalidEnv() {
- new DmaapDmePublisherWrapper(makeBuilder().environment(null).build());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmePublisherWrapper_InvalidAft() {
- new DmaapDmePublisherWrapper(makeBuilder().aftEnvironment(null).build());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmePublisherWrapper_InvalidLat() {
- new DmaapDmePublisherWrapper(makeBuilder().latitude(null).build());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmePublisherWrapper_InvalidLong() {
- new DmaapDmePublisherWrapper(makeBuilder().longitude(null).build());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testDmaapDmePublisherWrapper_InvalidPartner() {
- new DmaapDmePublisherWrapper(makeBuilder().partner(null).build());
- }
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java
index 01028045..0a2a5d34 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -53,7 +54,7 @@ public class BusTopicBaseTest extends TopicTestBase {
@Test
public void testSerialize() {
- new GsonTestUtils().compareGson(base, BusTopicBaseTest.class);
+ assertThatCode(() -> new GsonTestUtils().compareGson(base, BusTopicBaseTest.class)).doesNotThrowAnyException();
}
@Test
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java
index c00f2b56..3abb8b10 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java
@@ -2,7 +2,8 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,8 +25,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.util.Arrays;
import java.util.LinkedList;
+import java.util.List;
import java.util.function.BiConsumer;
import org.junit.Before;
import org.junit.Test;
@@ -41,12 +42,12 @@ public class BusTopicParamsTest extends TopicTestBase {
}
@Test
- public void test() {
+ public void testGetters() {
BusTopicParams params = makeBuilder().build();
assertEquals(addProps, params.getAdditionalProps());
assertEquals(MY_AFT_ENV, params.getAftEnvironment());
- assertEquals(true, params.isAllowSelfSignedCerts());
+ assertTrue(params.isAllowSelfSignedCerts());
assertEquals(MY_API_KEY, params.getApiKey());
assertEquals(MY_API_SECRET, params.getApiSecret());
assertEquals(MY_BASE_PATH, params.getBasePath());
@@ -59,7 +60,7 @@ public class BusTopicParamsTest extends TopicTestBase {
assertEquals(MY_HOST, params.getHostname());
assertEquals(MY_LAT, params.getLatitude());
assertEquals(MY_LONG, params.getLongitude());
- assertEquals(true, params.isManaged());
+ assertTrue(params.isManaged());
assertEquals(MY_PARTITION, params.getPartitionId());
assertEquals(MY_PARTNER, params.getPartner());
assertEquals(MY_PASS, params.getPassword());
@@ -67,13 +68,21 @@ public class BusTopicParamsTest extends TopicTestBase {
assertEquals(servers, params.getServers());
assertEquals(MY_TOPIC, params.getTopic());
assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
- assertEquals(true, params.isUseHttps());
+ assertTrue(params.isUseHttps());
assertEquals(MY_USERNAME, params.getUserName());
+ }
+ @Test
+ public void testBooleanGetters() {
// ensure that booleans are independent of each other
- testBoolean("true:false:false", (bldr, flag) -> bldr.allowSelfSignedCerts(flag));
- testBoolean("false:true:false", (bldr, flag) -> bldr.managed(flag));
- testBoolean("false:false:true", (bldr, flag) -> bldr.useHttps(flag));
+ testBoolean("true:false:false", TopicParamsBuilder::allowSelfSignedCerts);
+ testBoolean("false:true:false", TopicParamsBuilder::managed);
+ testBoolean("false:false:true", TopicParamsBuilder::useHttps);
+ }
+
+ @Test
+ public void testValidators() {
+ BusTopicParams params = makeBuilder().build();
// test validity methods
assertTrue(params.isAdditionalPropsValid());
@@ -94,8 +103,10 @@ public class BusTopicParamsTest extends TopicTestBase {
assertFalse(params.isServersInvalid());
assertFalse(params.isTopicInvalid());
assertTrue(params.isUserNameValid());
+ }
- // test inverted validity
+ @Test
+ public void testInvertedValidators() {
assertFalse(makeBuilder().additionalProps(null).build().isAdditionalPropsValid());
assertTrue(makeBuilder().aftEnvironment("").build().isAftEnvironmentInvalid());
assertFalse(makeBuilder().apiKey("").build().isApiKeyValid());
@@ -114,15 +125,15 @@ public class BusTopicParamsTest extends TopicTestBase {
assertTrue(makeBuilder().port(65536).build().isPortInvalid());
assertTrue(makeBuilder().servers(null).build().isServersInvalid());
assertTrue(makeBuilder().servers(new LinkedList<>()).build().isServersInvalid());
- assertTrue(makeBuilder().servers(Arrays.asList("")).build().isServersInvalid());
- assertFalse(makeBuilder().servers(Arrays.asList("one-server")).build().isServersInvalid());
+ assertTrue(makeBuilder().servers(List.of("")).build().isServersInvalid());
+ assertFalse(makeBuilder().servers(List.of("one-server")).build().isServersInvalid());
assertTrue(makeBuilder().topic("").build().isTopicInvalid());
assertFalse(makeBuilder().userName("").build().isUserNameValid());
}
/**
* Tests the boolean methods by applying a function, once with {@code false} and once
- * with {@code true}. Verifies that all of the boolean methods return the correct
+ * with {@code true}. Verifies that all the boolean methods return the correct
* value by concatenating them.
*
* @param expectedTrue the string that is expected when {@code true} is passed to the
@@ -137,7 +148,7 @@ public class BusTopicParamsTest extends TopicTestBase {
BusTopicParams params = builder.build();
assertEquals("false:false:false",
- "" + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps());
+ params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps());
// now try the "true" case
@@ -145,6 +156,6 @@ public class BusTopicParamsTest extends TopicTestBase {
params = builder.build();
assertEquals(expectedTrue,
- "" + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps());
+ params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps());
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java
index 634ee762..7aa70b2a 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java
@@ -3,6 +3,7 @@
* policy-endpoints
* ================================================================================
* Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +21,7 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -31,6 +33,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
+import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -61,7 +64,8 @@ public class InlineBusTopicSinkTest extends TopicTestBase {
@Test
public void testSerialize() {
- new GsonTestUtils().compareGson(sink, InlineBusTopicSinkTest.class);
+ assertThatCode(() -> new GsonTestUtils().compareGson(sink, InlineBusTopicSinkTest.class))
+ .doesNotThrowAnyException();
}
@Test
@@ -122,7 +126,7 @@ public class InlineBusTopicSinkTest extends TopicTestBase {
verify(pub).send(MY_PARTITION, MY_MESSAGE);
verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
- assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(sink.getRecentEvents()));
+ assertEquals(List.of(MY_MESSAGE), Arrays.asList(sink.getRecentEvents()));
// arrange for send to throw an exception
when(pub.send(anyString(), anyString())).thenThrow(new RuntimeException(EXPECTED));
@@ -136,8 +140,7 @@ public class InlineBusTopicSinkTest extends TopicTestBase {
@Test(expected = IllegalArgumentException.class)
public void testSend_NullMessage() {
sink.start();
- BusPublisher pub = mock(BusPublisher.class);
- sink.publisher = pub;
+ sink.publisher = mock(BusPublisher.class);
sink.send(null);
}
@@ -145,16 +148,14 @@ public class InlineBusTopicSinkTest extends TopicTestBase {
@Test(expected = IllegalArgumentException.class)
public void testSend_EmptyMessage() {
sink.start();
- BusPublisher pub = mock(BusPublisher.class);
- sink.publisher = pub;
+ sink.publisher = mock(BusPublisher.class);
sink.send("");
}
@Test(expected = IllegalStateException.class)
public void testSend_NotStarted() {
- BusPublisher pub = mock(BusPublisher.class);
- sink.publisher = pub;
+ sink.publisher = mock(BusPublisher.class);
sink.send(MY_MESSAGE);
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.java
deleted file mode 100644
index d9bc990b..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.utils.gson.GsonTestUtils;
-
-public class InlineDmaapTopicSinkTest extends TopicTestBase {
- private InlineDmaapTopicSink sink;
-
- /**
- * Creates the object to be tested.
- */
- @Before
- @Override
- public void setUp() {
- super.setUp();
-
- sink = new InlineDmaapTopicSink(makeBuilder().build());
- }
-
- @After
- public void tearDown() {
- sink.shutdown();
- }
-
- @Test
- public void testSerialize() {
- new GsonTestUtils().compareGson(sink, InlineDmaapTopicSinkTest.class);
- }
-
- @Test
- public void testToString() {
- assertTrue(sink.toString().startsWith("InlineDmaapTopicSink ["));
- }
-
- @Test
- public void testInit() {
- // nothing null
- sink = new InlineDmaapTopicSink(makeBuilder().build());
- sink.init();
- sink.shutdown();
-
- // no DME2 info
- sink = new InlineDmaapTopicSink(makeBuilder().environment(null).aftEnvironment(null).latitude(null)
- .longitude(null).partner(null).build());
- sink.init();
- sink.shutdown();
- }
-
- @Test
- public void testGetTopicCommInfrastructure() {
- assertEquals(CommInfrastructure.DMAAP, sink.getTopicCommInfrastructure());
- }
-
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java
index a45504f2..643025c2 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2022-2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -28,10 +29,9 @@ import org.junit.Before;
import org.junit.Test;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.utils.gson.GsonTestUtils;
-public class InlineUebTopicSinkTest extends TopicTestBase {
- private InlineUebTopicSink sink;
+public class InlineKafkaTopicSinkTest extends TopicTestBase {
+ private InlineKafkaTopicSink sink;
/**
* Creates the object to be tested.
@@ -41,7 +41,7 @@ public class InlineUebTopicSinkTest extends TopicTestBase {
public void setUp() {
super.setUp();
- sink = new InlineUebTopicSink(makeBuilder().build());
+ sink = new InlineKafkaTopicSink(makeKafkaBuilder().build());
}
@After
@@ -50,23 +50,21 @@ public class InlineUebTopicSinkTest extends TopicTestBase {
}
@Test
- public void testSerialize() {
- new GsonTestUtils().compareGson(sink, InlineUebTopicSinkTest.class);
- }
-
- @Test
public void testToString() {
- assertTrue(sink.toString().startsWith("InlineUebTopicSink ["));
+ assertTrue(sink.toString().startsWith("InlineKafkaTopicSink ["));
}
@Test
public void testInit() {
+ // nothing null
+ sink = new InlineKafkaTopicSink(makeKafkaBuilder().build());
sink.init();
+ assertThatCode(() -> sink.shutdown()).doesNotThrowAnyException();
}
@Test
public void testGetTopicCommInfrastructure() {
- assertEquals(CommInfrastructure.UEB, sink.getTopicCommInfrastructure());
+ assertEquals(CommInfrastructure.KAFKA, sink.getTopicCommInfrastructure());
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java
index 16d74df2..dbdd8813 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,8 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -42,8 +44,8 @@ import org.mockito.stubbing.Answer;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
import org.onap.policy.common.utils.gson.GsonTestUtils;
+import org.onap.policy.common.utils.network.NetworkUtil;
public class SingleThreadedBusTopicSourceTest extends TopicTestBase {
private Thread thread;
@@ -72,7 +74,8 @@ public class SingleThreadedBusTopicSourceTest extends TopicTestBase {
@Test
public void testSerialize() {
- new GsonTestUtils().compareGson(source, SingleThreadedBusTopicSourceTest.class);
+ assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedBusTopicSourceTest.class))
+ .doesNotThrowAnyException();
}
@Test
@@ -159,11 +162,30 @@ public class SingleThreadedBusTopicSourceTest extends TopicTestBase {
@Test
public void testSingleThreadedBusTopicSource() {
+ // Note: if the value contains "-", it's probably a UUID
+
// verify that different wrappers can be built
- new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerGroup(null).build());
- new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerInstance(null).build());
- new SingleThreadedBusTopicSourceImpl(makeBuilder().fetchTimeout(-1).build());
- new SingleThreadedBusTopicSourceImpl(makeBuilder().fetchLimit(-1).build());
+ source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
+ assertThat(source.getConsumerGroup()).isEqualTo(MY_CONS_GROUP);
+ assertThat(source.getConsumerInstance()).isEqualTo(MY_CONS_INST);
+
+ // group is null => group is UUID, instance is as provided
+ source = new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerGroup(null).build());
+ assertThat(source.getConsumerGroup()).contains("-").isNotEqualTo(NetworkUtil.getHostname());
+ assertThat(source.getConsumerInstance()).isEqualTo(MY_CONS_INST);
+
+ // instance is null => group is as provided, instance is UUID
+ source = new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerInstance(null).build());
+ assertThat(source.getConsumerGroup()).isEqualTo(MY_CONS_GROUP);
+ assertThat(source.getConsumerInstance()).contains("-").isNotEqualTo(NetworkUtil.getHostname());
+
+ // group & instance are null => group is UUID, instance is hostname
+ source = new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerGroup(null).consumerInstance(null).build());
+ assertThat(source.getConsumerGroup()).contains("-").isNotEqualTo(NetworkUtil.getHostname());
+ assertThat(source.getConsumerInstance()).isEqualTo(NetworkUtil.getHostname());
+
+ assertThatCode(() -> new SingleThreadedBusTopicSourceImpl(
+ makeBuilder().fetchLimit(-1).fetchTimeout(-1).build())).doesNotThrowAnyException();
}
@Test
@@ -284,22 +306,6 @@ public class SingleThreadedBusTopicSourceTest extends TopicTestBase {
}
@Test
- public void testSetFilter() {
- FilterableBusConsumer filt = mock(FilterableBusConsumer.class);
- cons = filt;
-
- source.start();
- source.setFilter("my-filter");
- verify(filt).setFilter("my-filter");
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testSetFilter_Unsupported() {
- source.start();
- source.setFilter("unsupported-filter");
- }
-
- @Test
public void testGetConsumerGroup() {
assertEquals(MY_CONS_GROUP, source.getConsumerGroup());
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.java
deleted file mode 100644
index b7faf161..00000000
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * policy-endpoints
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.common.endpoints.event.comm.bus.internal;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.net.MalformedURLException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
-import org.onap.policy.common.utils.gson.GsonTestUtils;
-
-public class SingleThreadedDmaapTopicSourceTest extends TopicTestBase {
- private static final String SOURCE_NAME = "SingleThreadedDmaapTopicSource [";
- private SingleThreadedDmaapTopicSource source;
-
- /**
- * Creates the object to be tested.
- */
- @Before
- @Override
- public void setUp() {
- super.setUp();
-
- source = new SingleThreadedDmaapTopicSource(makeBuilder().build());
- }
-
- @After
- public void tearDown() {
- source.shutdown();
- }
-
- @Test
- public void testSerialize() {
- new GsonTestUtils().compareGson(source, SingleThreadedDmaapTopicSourceTest.class);
- }
-
- @Test
- public void testToString() {
- assertTrue(source.toString().startsWith(SOURCE_NAME));
- source.shutdown();
-
- // try with null password
- source = new SingleThreadedDmaapTopicSource(makeBuilder().password(null).build());
- assertTrue(source.toString().startsWith(SOURCE_NAME));
- source.shutdown();
-
- // try with empty password
- source = new SingleThreadedDmaapTopicSource(makeBuilder().password("").build());
- assertTrue(source.toString().startsWith(SOURCE_NAME));
- source.shutdown();
- }
-
- @Test
- public void testInit() {
- // verify with different parameters
- new SingleThreadedDmaapTopicSource(makeBuilder().userName(null).build()).shutdown();
- new SingleThreadedDmaapTopicSource(makeBuilder().environment(null).aftEnvironment(null).latitude(null)
- .longitude(null).partner(null).build()).shutdown();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testSingleThreadedDmaapTopicSource_Ex() {
- new SingleThreadedDmaapTopicSource(makeBuilder().build()) {
- @Override
- public void init() throws MalformedURLException {
- throw new MalformedURLException(EXPECTED);
- }
- }.shutdown();
- }
-
- @Test
- public void testGetTopicCommInfrastructure() {
- assertEquals(CommInfrastructure.DMAAP, source.getTopicCommInfrastructure());
- }
-
-}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java
index 2ff353b8..6b63c9f4 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -30,8 +31,8 @@ import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
import org.onap.policy.common.utils.gson.GsonTestUtils;
-public class SingleThreadedUebTopicSourceTest extends TopicTestBase {
- private SingleThreadedUebTopicSource source;
+public class SingleThreadedKafkaTopicSourceTest extends TopicTestBase {
+ private SingleThreadedKafkaTopicSource source;
/**
* Creates the object to be tested.
@@ -41,7 +42,7 @@ public class SingleThreadedUebTopicSourceTest extends TopicTestBase {
public void setUp() {
super.setUp();
- source = new SingleThreadedUebTopicSource(makeBuilder().build());
+ source = new SingleThreadedKafkaTopicSource(makeKafkaBuilder().build());
}
@After
@@ -49,20 +50,20 @@ public class SingleThreadedUebTopicSourceTest extends TopicTestBase {
source.shutdown();
}
- @Test
public void testSerialize() {
- new GsonTestUtils().compareGson(source, SingleThreadedUebTopicSourceTest.class);
+ assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedKafkaTopicSourceTest.class))
+ .doesNotThrowAnyException();
}
@Test
public void testToString() {
- assertTrue(source.toString().startsWith("SingleThreadedUebTopicSource ["));
+ assertTrue(source.toString().startsWith("SingleThreadedKafkaTopicSource ["));
source.shutdown();
}
@Test
public void testGetTopicCommInfrastructure() {
- assertEquals(CommInfrastructure.UEB, source.getTopicCommInfrastructure());
+ assertEquals(CommInfrastructure.KAFKA, source.getTopicCommInfrastructure());
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java
index 0cf1486f..0f09b12e 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java
@@ -20,6 +20,7 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -97,7 +98,7 @@ public class TopicBaseTest extends TopicTestBase {
@Test
public void testSerialize() {
- new GsonTestUtils().compareGson(base, TopicBaseTest.class);
+ assertThatCode(() -> new GsonTestUtils().compareGson(base, TopicBaseTest.class)).doesNotThrowAnyException();
}
@Test
@@ -193,9 +194,15 @@ public class TopicBaseTest extends TopicTestBase {
assertTrue(base.unlock());
assertEquals(1, base.startCount);
assertEquals(1, base.stopCount);
+ }
+
+ /**
+ * Tests lock/unlock when the stop/start methods return false.
+ */
+ @Test
+ public void testLock_testUnlock_FalseReturns() {
// lock, but stop returns false
- base = new TopicBaseImpl(servers, MY_TOPIC);
base.stopReturn = false;
assertFalse(base.lock());
assertTrue(base.isLocked());
@@ -206,9 +213,15 @@ public class TopicBaseTest extends TopicTestBase {
assertFalse(base.unlock());
assertFalse(base.isLocked());
assertTrue(base.unlock());
+ }
+
+ /**
+ * Tests lock/unlock when the start method throws an exception.
+ */
+ @Test
+ public void testLock_testUnlock_Exception() {
// lock & re-lock, but start throws an exception
- base = new TopicBaseImpl(servers, MY_TOPIC);
base.startEx = true;
assertTrue(base.lock());
assertFalse(base.unlock());