diff options
Diffstat (limited to 'policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal')
11 files changed, 295 insertions, 621 deletions
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java index 0255c100..2c33a257 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java @@ -2,7 +2,8 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,218 +21,257 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.att.nsa.cambria.client.CambriaConsumer; import java.io.IOException; -import java.util.Arrays; +import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.HashMap; import java.util.List; -import org.apache.commons.collections4.IteratorUtils; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.onap.dmaap.mr.client.impl.MRConsumerImpl; -import org.onap.dmaap.mr.client.response.MRConsumerResponse; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper; -import org.powermock.reflect.Whitebox; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; public class BusConsumerTest extends TopicTestBase { + private static final int SHORT_TIMEOUT_MILLIS = 10; + private static final int LONG_TIMEOUT_MILLIS = 3000; + + @Mock + KafkaConsumer<String, String> mockedKafkaConsumer; + + AutoCloseable closeable; + @Before @Override public void setUp() { super.setUp(); + closeable = MockitoAnnotations.openMocks(this); } - @Test - public void testCambriaConsumerWrapper() { - // verify that different wrappers can be built - new CambriaConsumerWrapper(makeBuilder().build()); - new CambriaConsumerWrapper(makeBuilder().useHttps(false).build()); - new CambriaConsumerWrapper(makeBuilder().useHttps(true).build()); - new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build()); - new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build()); - new CambriaConsumerWrapper(makeBuilder().apiKey(null).build()); - new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build()); - new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build()); - new CambriaConsumerWrapper(makeBuilder().userName(null).build()); - new CambriaConsumerWrapper(makeBuilder().password(null).build()); - new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()); + @After + public void tearDown() throws Exception { + closeable.close(); } - @Test - public void testCambriaConsumerWrapperFetch() throws Exception { - CambriaConsumer inner = mock(CambriaConsumer.class); - List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2); - when(inner.fetch()).thenReturn(lst); - - CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build()); - Whitebox.setInternalState(cons, "consumer", inner); - - assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator())); - // arrange to throw exception next time fetch is called - IOException ex = new IOException(EXPECTED); - when(inner.fetch()).thenThrow(ex); - - cons.fetchTimeout = 10; - - try { - cons.fetch(); - fail("missing exception"); - - } catch (IOException e) { - assertEquals(ex, e); - } + @Test + public void testFetchingBusConsumer() { + // should not be negative + var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build()); + assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH); + + // should not be zero + cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build()); + assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH); + + // should not be too large + cons = new FetchingBusConsumerImpl( + makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build()); + assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH); + + // should not be what was specified + cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build()); + assertThat(cons.getSleepTime()).isEqualTo(100); } @Test - public void testCambriaConsumerWrapperClose() { - CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build()); - - // set filter several times to cause different branches of close() to be executed - for (int count = 0; count < 3; ++count) { - cons.close(); - cons.setFilter("close=" + count); - } + public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException { + + var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) { + + private CountDownLatch started = new CountDownLatch(1); + + @Override + protected void sleepAfterFetchFailure() { + started.countDown(); + super.sleepAfterFetchFailure(); + } + }; + + // full sleep + long tstart = System.currentTimeMillis(); + cons.sleepAfterFetchFailure(); + assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS); + + // close while sleeping - sleep should halt prematurely + cons.fetchTimeout = LONG_TIMEOUT_MILLIS; + cons.started = new CountDownLatch(1); + Thread thread = new Thread(cons::sleepAfterFetchFailure); + tstart = System.currentTimeMillis(); + thread.start(); + cons.started.await(); + cons.close(); + thread.join(); + assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS); + + // interrupt while sleeping - sleep should halt prematurely + cons.fetchTimeout = LONG_TIMEOUT_MILLIS; + cons.started = new CountDownLatch(1); + thread = new Thread(cons::sleepAfterFetchFailure); + tstart = System.currentTimeMillis(); + thread.start(); + cons.started.await(); + thread.interrupt(); + thread.join(); + assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS); } @Test - public void testCambriaConsumerWrapperSetFilter() { - // set filter several times to cause different branches to be executed - CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build()); - for (int count = 0; count < 3; ++count) { - cons.setFilter("set-filter=" + count); - } + public void testKafkaConsumerWrapper() { + // verify that different wrappers can be built + assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException(); } - @Test - public void testCambriaConsumerWrapperToString() { - assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString()); + @Test(expected = IllegalArgumentException.class) + public void testKafkaConsumerWrapper_InvalidTopic() { + new KafkaConsumerWrapper(makeBuilder().topic(null).build()); } @Test - public void testDmaapConsumerWrapper() throws Exception { - // verify that different wrappers can be built - new DmaapAafConsumerWrapper(makeBuilder().build()); + public void testKafkaConsumerWrapperFetch() { + + //Setup Properties for consumer + Properties kafkaProps = new Properties(); + kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test"); + kafkaProps.setProperty("enable.auto.commit", "true"); + kafkaProps.setProperty("auto.commit.interval.ms", "1000"); + kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build()); + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps); + kafka.consumer = consumer; + + assertThrows(java.lang.IllegalStateException.class, () -> kafka.fetch().iterator().hasNext()); + consumer.close(); } - @Test(expected = IllegalArgumentException.class) - public void testDmaapConsumerWrapper_InvalidTopic() throws Exception { - new DmaapAafConsumerWrapper(makeBuilder().topic(null).build()); + @Test + public void testFetchNoMessages() throws IOException { + KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build()); + kafkaConsumerWrapper.consumer = mockedKafkaConsumer; + + when(mockedKafkaConsumer.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + + Iterable<String> result = kafkaConsumerWrapper.fetch(); + + verify(mockedKafkaConsumer, times(1)).poll(any()); + + assertThat(result != null); + + assertThat(!result.iterator().hasNext()); + + mockedKafkaConsumer.close(); } @Test - public void testDmaapConsumerWrapperFetch() throws Exception { - DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build()); - MRConsumerImpl cons = mock(MRConsumerImpl.class); + public void testFetchWithMessages() { + // Setup + KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build()); + kafkaConsumerWrapper.consumer = mockedKafkaConsumer; - dmaap.fetchTimeout = 5; - dmaap.consumer = cons; + ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value"); + Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>(); + recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record)); + ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap); - // null return - when(cons.fetchWithReturnConsumerResponse()).thenReturn(null); - assertFalse(dmaap.fetch().iterator().hasNext()); + when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords); - // with messages, 200 - List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2); - MRConsumerResponse resp = new MRConsumerResponse(); - resp.setResponseCode("200"); - resp.setActualMessages(lst); - when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp); + Iterable<String> result = kafkaConsumerWrapper.fetch(); - assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator())); + verify(mockedKafkaConsumer, times(1)).poll(any()); - // null messages - resp.setActualMessages(null); - when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp); + verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class)); - assertFalse(dmaap.fetch().iterator().hasNext()); + assertThat(result != null); - // with messages, NOT 200 - resp.setResponseCode("400"); - resp.setActualMessages(lst); - when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp); + assertThat(result.iterator().hasNext()); - assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator())); - } + assertThat(result.iterator().next().equals("value")); - @Test - public void testDmaapConsumerWrapperClose() throws Exception { - new DmaapAafConsumerWrapper(makeBuilder().build()).close(); + mockedKafkaConsumer.close(); } @Test - public void testDmaapConsumerWrapperToString() throws Exception { - assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString()); - } + public void testFetchWithMessagesAndTraceparent() { + // Setup + KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build()); + kafkaConsumerWrapper.consumer = mockedKafkaConsumer; - @Test - public void testDmaapAafConsumerWrapper() throws Exception { - // verify that different wrappers can be built - new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build()); - new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build()); - } + ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value"); + record.headers().add( + "traceparent", + "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".getBytes(StandardCharsets.UTF_8) + ); - @Test(expected = IllegalArgumentException.class) - public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception { - /* - * Unfortunately, the MR code intercepts this and throws an exception before the - * wrapper gets a chance to check it, thus this test does not improve the coverage - * for the constructor. - */ - new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build()); - } + Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>(); + recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record)); + ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap); - @Test - public void testDmaapAafConsumerWrapperToString() throws Exception { - assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString()); - } + when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords); - @Test - public void testDmaapDmeConsumerWrapper() throws Exception { - // verify that different wrappers can be built - new DmaapDmeConsumerWrapper(makeBuilder().build()); - new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build()); - new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build()); - new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build()); - - addProps.put(ROUTE_PROP, MY_ROUTE); - new DmaapDmeConsumerWrapper(makeBuilder().build()); - new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()); - } + Iterable<String> result = kafkaConsumerWrapper.fetch(); - @Test(expected = IllegalArgumentException.class) - public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception { - new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build()); - } + verify(mockedKafkaConsumer, times(1)).poll(any()); - @Test(expected = IllegalArgumentException.class) - public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception { - new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build()); + verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class)); + + assertThat(result != null); + + assertThat(result.iterator().hasNext()); + + assertThat(result.iterator().next().equals("value")); + + mockedKafkaConsumer.close(); } - @Test(expected = IllegalArgumentException.class) - public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception { - new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build()); + + @Test + public void testKafkaConsumerWrapperClose() { + assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException(); } - @Test(expected = IllegalArgumentException.class) - public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception { - new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build()); + @Test + public void testKafkaConsumerWrapperToString() { + assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString()); } - @Test(expected = IllegalArgumentException.class) - public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception { - new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()); + private static class FetchingBusConsumerImpl extends FetchingBusConsumer { + + protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) { + super(busTopicParams); + } + + @Override + public Iterable<String> fetch() { + return null; + } } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java deleted file mode 100644 index 5a933e9b..00000000 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import java.io.IOException; -import java.util.Collections; -import java.util.concurrent.TimeUnit; -import org.junit.Before; -import org.junit.Test; -import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher; -import org.onap.dmaap.mr.client.response.MRPublisherResponse; -import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; -import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.CambriaPublisherWrapper; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.DmaapAafPublisherWrapper; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.DmaapDmePublisherWrapper; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.DmaapPublisherWrapper; - -public class BusPublisherTest extends TopicTestBase { - - @Before - @Override - public void setUp() { - super.setUp(); - } - - @Test - public void testCambriaPublisherWrapper() { - // verify that different wrappers can be built - new CambriaPublisherWrapper(makeBuilder().build()); - new CambriaPublisherWrapper(makeBuilder().useHttps(false).build()); - new CambriaPublisherWrapper(makeBuilder().useHttps(true).build()); - new CambriaPublisherWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build()); - new CambriaPublisherWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build()); - new CambriaPublisherWrapper(makeBuilder().apiKey(null).build()); - new CambriaPublisherWrapper(makeBuilder().apiSecret(null).build()); - new CambriaPublisherWrapper(makeBuilder().apiKey(null).apiSecret(null).build()); - new CambriaPublisherWrapper(makeBuilder().userName(null).build()); - new CambriaPublisherWrapper(makeBuilder().password(null).build()); - new CambriaPublisherWrapper(makeBuilder().userName(null).password(null).build()); - } - - @Test - public void testCambriaPublisherWrapperSend() throws Exception { - CambriaBatchingPublisher pub = mock(CambriaBatchingPublisher.class); - CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build()); - cambria.publisher = pub; - - assertTrue(cambria.send(MY_PARTITION, MY_MESSAGE)); - - // publisher exception - when(pub.send(anyString(), anyString())).thenThrow(new IOException(EXPECTED)); - assertFalse(cambria.send(MY_PARTITION2, MY_MESSAGE2)); - } - - @Test(expected = IllegalArgumentException.class) - public void testCambriaPublisherWrapperSend_InvalidMsg() { - CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build()); - cambria.publisher = mock(CambriaBatchingPublisher.class); - - cambria.send(MY_PARTITION, null); - } - - @Test - public void testCambriaPublisherWrapperClose() { - CambriaBatchingPublisher pub = mock(CambriaBatchingPublisher.class); - CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build()); - cambria.publisher = pub; - - cambria.close(); - verify(pub).close(); - - // try again, this time with an exception - doThrow(new RuntimeException(EXPECTED)).when(pub).close(); - cambria.close(); - } - - @Test - public void testDmaapPublisherWrapper() { - // verify with different constructor arguments - new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASS, true); - new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASS, false); - new DmaapPublisherWrapper(ProtocolTypeConstants.DME2, servers, MY_TOPIC, MY_USERNAME, MY_PASS, true) {}; - } - - @Test(expected = IllegalArgumentException.class) - public void testDmaapPublisherWrapper_InvalidTopic() { - new DmaapPublisherWrapper(ProtocolTypeConstants.DME2, servers, "", MY_USERNAME, MY_PASS, true) {}; - } - - @Test(expected = IllegalArgumentException.class) - public void testDmaapPublisherWrapper_Aaf_NullServers() { - new DmaapAafPublisherWrapper(null, MY_TOPIC, MY_USERNAME, MY_PASS, true); - } - - @Test(expected = IllegalArgumentException.class) - public void testDmaapPublisherWrapper_Aaf_NoServers() { - new DmaapAafPublisherWrapper(Collections.emptyList(), MY_TOPIC, MY_USERNAME, MY_PASS, true); - } - - @Test(expected = IllegalArgumentException.class) - public void testDmaapPublisherWrapper_InvalidProtocol() { - new DmaapPublisherWrapper(ProtocolTypeConstants.HTTPNOAUTH, servers, MY_TOPIC, MY_USERNAME, MY_PASS, true) {}; - } - - @Test - public void testDmaapPublisherWrapperClose() throws Exception { - MRSimplerBatchPublisher pub = mock(MRSimplerBatchPublisher.class); - DmaapPublisherWrapper dmaap = new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASS, true); - dmaap.publisher = pub; - - dmaap.close(); - verify(pub).close(anyLong(), any(TimeUnit.class)); - - // close, but with exception from publisher - doThrow(new IOException(EXPECTED)).when(pub).close(anyLong(), any(TimeUnit.class)); - dmaap.close(); - } - - @Test - public void testDmaapPublisherWrapperSend() { - MRSimplerBatchPublisher pub = mock(MRSimplerBatchPublisher.class); - DmaapPublisherWrapper dmaap = new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASS, true); - dmaap.publisher = pub; - - // null response - assertTrue(dmaap.send(MY_PARTITION, MY_MESSAGE)); - verify(pub).setPubResponse(any(MRPublisherResponse.class)); - verify(pub).send(MY_PARTITION, MY_MESSAGE); - - // with response - pub = mock(MRSimplerBatchPublisher.class); - dmaap.publisher = pub; - - MRPublisherResponse resp = new MRPublisherResponse(); - when(pub.sendBatchWithResponse()).thenReturn(resp); - assertTrue(dmaap.send(MY_PARTITION, MY_MESSAGE)); - verify(pub).setPubResponse(any(MRPublisherResponse.class)); - verify(pub).send(MY_PARTITION, MY_MESSAGE); - } - - @Test(expected = IllegalArgumentException.class) - public void testDmaapPublisherWrapperSend_NullMessage() { - MRSimplerBatchPublisher pub = mock(MRSimplerBatchPublisher.class); - DmaapPublisherWrapper dmaap = new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASS, true); - dmaap.publisher = pub; - - dmaap.send(MY_PARTITION, null); - } - - @Test - public void testDmaapDmePublisherWrapper() { - // verify with different parameters - new DmaapDmePublisherWrapper(makeBuilder().build()); - new DmaapDmePublisherWrapper(makeBuilder().additionalProps(null).build()); - - addProps.put(ROUTE_PROP, MY_ROUTE); - new DmaapDmePublisherWrapper(makeBuilder().build()); - new DmaapDmePublisherWrapper(makeBuilder().partner(null).build()); - - addProps.put("null-value", null); - new DmaapDmePublisherWrapper(makeBuilder().build()); - } - - @Test(expected = IllegalArgumentException.class) - public void testDmaapDmePublisherWrapper_InvalidEnv() { - new DmaapDmePublisherWrapper(makeBuilder().environment(null).build()); - } - - @Test(expected = IllegalArgumentException.class) - public void testDmaapDmePublisherWrapper_InvalidAft() { - new DmaapDmePublisherWrapper(makeBuilder().aftEnvironment(null).build()); - } - - @Test(expected = IllegalArgumentException.class) - public void testDmaapDmePublisherWrapper_InvalidLat() { - new DmaapDmePublisherWrapper(makeBuilder().latitude(null).build()); - } - - @Test(expected = IllegalArgumentException.class) - public void testDmaapDmePublisherWrapper_InvalidLong() { - new DmaapDmePublisherWrapper(makeBuilder().longitude(null).build()); - } - - @Test(expected = IllegalArgumentException.class) - public void testDmaapDmePublisherWrapper_InvalidPartner() { - new DmaapDmePublisherWrapper(makeBuilder().partner(null).build()); - } -} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java index 01028045..0a2a5d34 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -53,7 +54,7 @@ public class BusTopicBaseTest extends TopicTestBase { @Test public void testSerialize() { - new GsonTestUtils().compareGson(base, BusTopicBaseTest.class); + assertThatCode(() -> new GsonTestUtils().compareGson(base, BusTopicBaseTest.class)).doesNotThrowAnyException(); } @Test diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java index c00f2b56..3abb8b10 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java @@ -2,7 +2,8 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,8 +25,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.util.Arrays; import java.util.LinkedList; +import java.util.List; import java.util.function.BiConsumer; import org.junit.Before; import org.junit.Test; @@ -41,12 +42,12 @@ public class BusTopicParamsTest extends TopicTestBase { } @Test - public void test() { + public void testGetters() { BusTopicParams params = makeBuilder().build(); assertEquals(addProps, params.getAdditionalProps()); assertEquals(MY_AFT_ENV, params.getAftEnvironment()); - assertEquals(true, params.isAllowSelfSignedCerts()); + assertTrue(params.isAllowSelfSignedCerts()); assertEquals(MY_API_KEY, params.getApiKey()); assertEquals(MY_API_SECRET, params.getApiSecret()); assertEquals(MY_BASE_PATH, params.getBasePath()); @@ -59,7 +60,7 @@ public class BusTopicParamsTest extends TopicTestBase { assertEquals(MY_HOST, params.getHostname()); assertEquals(MY_LAT, params.getLatitude()); assertEquals(MY_LONG, params.getLongitude()); - assertEquals(true, params.isManaged()); + assertTrue(params.isManaged()); assertEquals(MY_PARTITION, params.getPartitionId()); assertEquals(MY_PARTNER, params.getPartner()); assertEquals(MY_PASS, params.getPassword()); @@ -67,13 +68,21 @@ public class BusTopicParamsTest extends TopicTestBase { assertEquals(servers, params.getServers()); assertEquals(MY_TOPIC, params.getTopic()); assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic()); - assertEquals(true, params.isUseHttps()); + assertTrue(params.isUseHttps()); assertEquals(MY_USERNAME, params.getUserName()); + } + @Test + public void testBooleanGetters() { // ensure that booleans are independent of each other - testBoolean("true:false:false", (bldr, flag) -> bldr.allowSelfSignedCerts(flag)); - testBoolean("false:true:false", (bldr, flag) -> bldr.managed(flag)); - testBoolean("false:false:true", (bldr, flag) -> bldr.useHttps(flag)); + testBoolean("true:false:false", TopicParamsBuilder::allowSelfSignedCerts); + testBoolean("false:true:false", TopicParamsBuilder::managed); + testBoolean("false:false:true", TopicParamsBuilder::useHttps); + } + + @Test + public void testValidators() { + BusTopicParams params = makeBuilder().build(); // test validity methods assertTrue(params.isAdditionalPropsValid()); @@ -94,8 +103,10 @@ public class BusTopicParamsTest extends TopicTestBase { assertFalse(params.isServersInvalid()); assertFalse(params.isTopicInvalid()); assertTrue(params.isUserNameValid()); + } - // test inverted validity + @Test + public void testInvertedValidators() { assertFalse(makeBuilder().additionalProps(null).build().isAdditionalPropsValid()); assertTrue(makeBuilder().aftEnvironment("").build().isAftEnvironmentInvalid()); assertFalse(makeBuilder().apiKey("").build().isApiKeyValid()); @@ -114,15 +125,15 @@ public class BusTopicParamsTest extends TopicTestBase { assertTrue(makeBuilder().port(65536).build().isPortInvalid()); assertTrue(makeBuilder().servers(null).build().isServersInvalid()); assertTrue(makeBuilder().servers(new LinkedList<>()).build().isServersInvalid()); - assertTrue(makeBuilder().servers(Arrays.asList("")).build().isServersInvalid()); - assertFalse(makeBuilder().servers(Arrays.asList("one-server")).build().isServersInvalid()); + assertTrue(makeBuilder().servers(List.of("")).build().isServersInvalid()); + assertFalse(makeBuilder().servers(List.of("one-server")).build().isServersInvalid()); assertTrue(makeBuilder().topic("").build().isTopicInvalid()); assertFalse(makeBuilder().userName("").build().isUserNameValid()); } /** * Tests the boolean methods by applying a function, once with {@code false} and once - * with {@code true}. Verifies that all of the boolean methods return the correct + * with {@code true}. Verifies that all the boolean methods return the correct * value by concatenating them. * * @param expectedTrue the string that is expected when {@code true} is passed to the @@ -137,7 +148,7 @@ public class BusTopicParamsTest extends TopicTestBase { BusTopicParams params = builder.build(); assertEquals("false:false:false", - "" + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps()); + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps()); // now try the "true" case @@ -145,6 +156,6 @@ public class BusTopicParamsTest extends TopicTestBase { params = builder.build(); assertEquals(expectedTrue, - "" + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps()); + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps()); } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java index 634ee762..7aa70b2a 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSinkTest.java @@ -3,6 +3,7 @@ * policy-endpoints * ================================================================================ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +21,7 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -31,6 +33,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Arrays; +import java.util.List; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -61,7 +64,8 @@ public class InlineBusTopicSinkTest extends TopicTestBase { @Test public void testSerialize() { - new GsonTestUtils().compareGson(sink, InlineBusTopicSinkTest.class); + assertThatCode(() -> new GsonTestUtils().compareGson(sink, InlineBusTopicSinkTest.class)) + .doesNotThrowAnyException(); } @Test @@ -122,7 +126,7 @@ public class InlineBusTopicSinkTest extends TopicTestBase { verify(pub).send(MY_PARTITION, MY_MESSAGE); verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE); - assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(sink.getRecentEvents())); + assertEquals(List.of(MY_MESSAGE), Arrays.asList(sink.getRecentEvents())); // arrange for send to throw an exception when(pub.send(anyString(), anyString())).thenThrow(new RuntimeException(EXPECTED)); @@ -136,8 +140,7 @@ public class InlineBusTopicSinkTest extends TopicTestBase { @Test(expected = IllegalArgumentException.class) public void testSend_NullMessage() { sink.start(); - BusPublisher pub = mock(BusPublisher.class); - sink.publisher = pub; + sink.publisher = mock(BusPublisher.class); sink.send(null); } @@ -145,16 +148,14 @@ public class InlineBusTopicSinkTest extends TopicTestBase { @Test(expected = IllegalArgumentException.class) public void testSend_EmptyMessage() { sink.start(); - BusPublisher pub = mock(BusPublisher.class); - sink.publisher = pub; + sink.publisher = mock(BusPublisher.class); sink.send(""); } @Test(expected = IllegalStateException.class) public void testSend_NotStarted() { - BusPublisher pub = mock(BusPublisher.class); - sink.publisher = pub; + sink.publisher = mock(BusPublisher.class); sink.send(MY_MESSAGE); } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.java deleted file mode 100644 index d9bc990b..00000000 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSinkTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; -import org.onap.policy.common.utils.gson.GsonTestUtils; - -public class InlineDmaapTopicSinkTest extends TopicTestBase { - private InlineDmaapTopicSink sink; - - /** - * Creates the object to be tested. - */ - @Before - @Override - public void setUp() { - super.setUp(); - - sink = new InlineDmaapTopicSink(makeBuilder().build()); - } - - @After - public void tearDown() { - sink.shutdown(); - } - - @Test - public void testSerialize() { - new GsonTestUtils().compareGson(sink, InlineDmaapTopicSinkTest.class); - } - - @Test - public void testToString() { - assertTrue(sink.toString().startsWith("InlineDmaapTopicSink [")); - } - - @Test - public void testInit() { - // nothing null - sink = new InlineDmaapTopicSink(makeBuilder().build()); - sink.init(); - sink.shutdown(); - - // no DME2 info - sink = new InlineDmaapTopicSink(makeBuilder().environment(null).aftEnvironment(null).latitude(null) - .longitude(null).partner(null).build()); - sink.init(); - sink.shutdown(); - } - - @Test - public void testGetTopicCommInfrastructure() { - assertEquals(CommInfrastructure.DMAAP, sink.getTopicCommInfrastructure()); - } - -} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java index a45504f2..643025c2 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSinkTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2022-2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -28,10 +29,9 @@ import org.junit.Before; import org.junit.Test; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; -import org.onap.policy.common.utils.gson.GsonTestUtils; -public class InlineUebTopicSinkTest extends TopicTestBase { - private InlineUebTopicSink sink; +public class InlineKafkaTopicSinkTest extends TopicTestBase { + private InlineKafkaTopicSink sink; /** * Creates the object to be tested. @@ -41,7 +41,7 @@ public class InlineUebTopicSinkTest extends TopicTestBase { public void setUp() { super.setUp(); - sink = new InlineUebTopicSink(makeBuilder().build()); + sink = new InlineKafkaTopicSink(makeKafkaBuilder().build()); } @After @@ -50,23 +50,21 @@ public class InlineUebTopicSinkTest extends TopicTestBase { } @Test - public void testSerialize() { - new GsonTestUtils().compareGson(sink, InlineUebTopicSinkTest.class); - } - - @Test public void testToString() { - assertTrue(sink.toString().startsWith("InlineUebTopicSink [")); + assertTrue(sink.toString().startsWith("InlineKafkaTopicSink [")); } @Test public void testInit() { + // nothing null + sink = new InlineKafkaTopicSink(makeKafkaBuilder().build()); sink.init(); + assertThatCode(() -> sink.shutdown()).doesNotThrowAnyException(); } @Test public void testGetTopicCommInfrastructure() { - assertEquals(CommInfrastructure.UEB, sink.getTopicCommInfrastructure()); + assertEquals(CommInfrastructure.KAFKA, sink.getTopicCommInfrastructure()); } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java index 16d74df2..dbdd8813 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -42,8 +44,8 @@ import org.mockito.stubbing.Answer; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer; import org.onap.policy.common.utils.gson.GsonTestUtils; +import org.onap.policy.common.utils.network.NetworkUtil; public class SingleThreadedBusTopicSourceTest extends TopicTestBase { private Thread thread; @@ -72,7 +74,8 @@ public class SingleThreadedBusTopicSourceTest extends TopicTestBase { @Test public void testSerialize() { - new GsonTestUtils().compareGson(source, SingleThreadedBusTopicSourceTest.class); + assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedBusTopicSourceTest.class)) + .doesNotThrowAnyException(); } @Test @@ -159,11 +162,30 @@ public class SingleThreadedBusTopicSourceTest extends TopicTestBase { @Test public void testSingleThreadedBusTopicSource() { + // Note: if the value contains "-", it's probably a UUID + // verify that different wrappers can be built - new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerGroup(null).build()); - new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerInstance(null).build()); - new SingleThreadedBusTopicSourceImpl(makeBuilder().fetchTimeout(-1).build()); - new SingleThreadedBusTopicSourceImpl(makeBuilder().fetchLimit(-1).build()); + source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build()); + assertThat(source.getConsumerGroup()).isEqualTo(MY_CONS_GROUP); + assertThat(source.getConsumerInstance()).isEqualTo(MY_CONS_INST); + + // group is null => group is UUID, instance is as provided + source = new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerGroup(null).build()); + assertThat(source.getConsumerGroup()).contains("-").isNotEqualTo(NetworkUtil.getHostname()); + assertThat(source.getConsumerInstance()).isEqualTo(MY_CONS_INST); + + // instance is null => group is as provided, instance is UUID + source = new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerInstance(null).build()); + assertThat(source.getConsumerGroup()).isEqualTo(MY_CONS_GROUP); + assertThat(source.getConsumerInstance()).contains("-").isNotEqualTo(NetworkUtil.getHostname()); + + // group & instance are null => group is UUID, instance is hostname + source = new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerGroup(null).consumerInstance(null).build()); + assertThat(source.getConsumerGroup()).contains("-").isNotEqualTo(NetworkUtil.getHostname()); + assertThat(source.getConsumerInstance()).isEqualTo(NetworkUtil.getHostname()); + + assertThatCode(() -> new SingleThreadedBusTopicSourceImpl( + makeBuilder().fetchLimit(-1).fetchTimeout(-1).build())).doesNotThrowAnyException(); } @Test @@ -284,22 +306,6 @@ public class SingleThreadedBusTopicSourceTest extends TopicTestBase { } @Test - public void testSetFilter() { - FilterableBusConsumer filt = mock(FilterableBusConsumer.class); - cons = filt; - - source.start(); - source.setFilter("my-filter"); - verify(filt).setFilter("my-filter"); - } - - @Test(expected = UnsupportedOperationException.class) - public void testSetFilter_Unsupported() { - source.start(); - source.setFilter("unsupported-filter"); - } - - @Test public void testGetConsumerGroup() { assertEquals(MY_CONS_GROUP, source.getConsumerGroup()); } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.java deleted file mode 100644 index b7faf161..00000000 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSourceTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * policy-endpoints - * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm.bus.internal; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.net.MalformedURLException; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; -import org.onap.policy.common.utils.gson.GsonTestUtils; - -public class SingleThreadedDmaapTopicSourceTest extends TopicTestBase { - private static final String SOURCE_NAME = "SingleThreadedDmaapTopicSource ["; - private SingleThreadedDmaapTopicSource source; - - /** - * Creates the object to be tested. - */ - @Before - @Override - public void setUp() { - super.setUp(); - - source = new SingleThreadedDmaapTopicSource(makeBuilder().build()); - } - - @After - public void tearDown() { - source.shutdown(); - } - - @Test - public void testSerialize() { - new GsonTestUtils().compareGson(source, SingleThreadedDmaapTopicSourceTest.class); - } - - @Test - public void testToString() { - assertTrue(source.toString().startsWith(SOURCE_NAME)); - source.shutdown(); - - // try with null password - source = new SingleThreadedDmaapTopicSource(makeBuilder().password(null).build()); - assertTrue(source.toString().startsWith(SOURCE_NAME)); - source.shutdown(); - - // try with empty password - source = new SingleThreadedDmaapTopicSource(makeBuilder().password("").build()); - assertTrue(source.toString().startsWith(SOURCE_NAME)); - source.shutdown(); - } - - @Test - public void testInit() { - // verify with different parameters - new SingleThreadedDmaapTopicSource(makeBuilder().userName(null).build()).shutdown(); - new SingleThreadedDmaapTopicSource(makeBuilder().environment(null).aftEnvironment(null).latitude(null) - .longitude(null).partner(null).build()).shutdown(); - } - - @Test(expected = IllegalArgumentException.class) - public void testSingleThreadedDmaapTopicSource_Ex() { - new SingleThreadedDmaapTopicSource(makeBuilder().build()) { - @Override - public void init() throws MalformedURLException { - throw new MalformedURLException(EXPECTED); - } - }.shutdown(); - } - - @Test - public void testGetTopicCommInfrastructure() { - assertEquals(CommInfrastructure.DMAAP, source.getTopicCommInfrastructure()); - } - -} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java index 2ff353b8..6b63c9f4 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSourceTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -30,8 +31,8 @@ import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; import org.onap.policy.common.utils.gson.GsonTestUtils; -public class SingleThreadedUebTopicSourceTest extends TopicTestBase { - private SingleThreadedUebTopicSource source; +public class SingleThreadedKafkaTopicSourceTest extends TopicTestBase { + private SingleThreadedKafkaTopicSource source; /** * Creates the object to be tested. @@ -41,7 +42,7 @@ public class SingleThreadedUebTopicSourceTest extends TopicTestBase { public void setUp() { super.setUp(); - source = new SingleThreadedUebTopicSource(makeBuilder().build()); + source = new SingleThreadedKafkaTopicSource(makeKafkaBuilder().build()); } @After @@ -49,20 +50,20 @@ public class SingleThreadedUebTopicSourceTest extends TopicTestBase { source.shutdown(); } - @Test public void testSerialize() { - new GsonTestUtils().compareGson(source, SingleThreadedUebTopicSourceTest.class); + assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedKafkaTopicSourceTest.class)) + .doesNotThrowAnyException(); } @Test public void testToString() { - assertTrue(source.toString().startsWith("SingleThreadedUebTopicSource [")); + assertTrue(source.toString().startsWith("SingleThreadedKafkaTopicSource [")); source.shutdown(); } @Test public void testGetTopicCommInfrastructure() { - assertEquals(CommInfrastructure.UEB, source.getTopicCommInfrastructure()); + assertEquals(CommInfrastructure.KAFKA, source.getTopicCommInfrastructure()); } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java index 0cf1486f..0f09b12e 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java @@ -20,6 +20,7 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -97,7 +98,7 @@ public class TopicBaseTest extends TopicTestBase { @Test public void testSerialize() { - new GsonTestUtils().compareGson(base, TopicBaseTest.class); + assertThatCode(() -> new GsonTestUtils().compareGson(base, TopicBaseTest.class)).doesNotThrowAnyException(); } @Test @@ -193,9 +194,15 @@ public class TopicBaseTest extends TopicTestBase { assertTrue(base.unlock()); assertEquals(1, base.startCount); assertEquals(1, base.stopCount); + } + + /** + * Tests lock/unlock when the stop/start methods return false. + */ + @Test + public void testLock_testUnlock_FalseReturns() { // lock, but stop returns false - base = new TopicBaseImpl(servers, MY_TOPIC); base.stopReturn = false; assertFalse(base.lock()); assertTrue(base.isLocked()); @@ -206,9 +213,15 @@ public class TopicBaseTest extends TopicTestBase { assertFalse(base.unlock()); assertFalse(base.isLocked()); assertTrue(base.unlock()); + } + + /** + * Tests lock/unlock when the start method throws an exception. + */ + @Test + public void testLock_testUnlock_Exception() { // lock & re-lock, but start throws an exception - base = new TopicBaseImpl(servers, MY_TOPIC); base.startEx = true; assertTrue(base.lock()); assertFalse(base.unlock()); |