summaryrefslogtreecommitdiffstats
path: root/src/test/java/org/onap/dcae/commonFunction
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/java/org/onap/dcae/commonFunction')
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java144
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java35
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java4
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java114
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisherTest.java (renamed from src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java)47
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java126
6 files changed, 270 insertions, 200 deletions
diff --git a/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java b/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java
deleted file mode 100644
index f4955ac8..00000000
--- a/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.commonFunction;
-
-import static org.hamcrest.CoreMatchers.allOf;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertSame;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaPublisher;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.concurrent.TimeUnit;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-
-@RunWith(MockitoJUnitRunner.class)
-public class DmaapPublishersTest {
-
- @Mock
- private CambriaPublisherFactory publisherFactory;
- @Mock
- private CambriaBatchingPublisher cambriaPublisher;
- private DmaapPublishers cut;
-
- @Rule
- public final ExpectedException expectedException = ExpectedException.none();
-
- @Before
- public void setUp() throws MalformedURLException, GeneralSecurityException {
- given(publisherFactory.createCambriaPublisher(anyString())).willReturn(cambriaPublisher);
- cut = DmaapPublishers.create(publisherFactory);
- }
-
- @Test
- public void getByStreamIdShouldUseCachedItem() throws IOException, GeneralSecurityException {
- // given
- String streamId = "sampleStream";
-
- // when
- CambriaBatchingPublisher firstPublisher = cut.getByStreamId(streamId);
- CambriaBatchingPublisher secondPublisher = cut.getByStreamId(streamId);
-
- // then
- verify(publisherFactory, times(1)).createCambriaPublisher(streamId);
- assertSame("should return same instance", firstPublisher, secondPublisher);
- }
-
- @Test
- public void getByStreamIdShouldHandleErrors() throws MalformedURLException, GeneralSecurityException {
- // given
- MalformedURLException exception = new MalformedURLException();
- given(publisherFactory.createCambriaPublisher(anyString())).willThrow(exception);
- expectedException.expect(allOf(
- instanceOf(UncheckedExecutionException.class),
- causeIsInstanceOf(exception.getClass())));
-
- // when
- cut.getByStreamId("a stream");
-
- // then
- // exception should have been thrown
- }
-
- @Test
- public void closeByStreamIdShouldCloseConnection() throws IOException, InterruptedException {
- // given
- String streamId = "sampleStream";
- given(cambriaPublisher.close(anyLong(), any(TimeUnit.class)))
- .willReturn(ImmutableList.of(new CambriaPublisher.message("p", "msg")));
-
- // when
- CambriaBatchingPublisher cachedPublisher = cut.getByStreamId(streamId);
- cut.closeByStreamId(streamId);
-
- // then
- assertSame("should return proper publisher", cambriaPublisher, cachedPublisher);
- verify(cambriaPublisher).close(20, TimeUnit.SECONDS);
- }
-
- @Test
- public void closeByStreamIdShouldHandleErrors() throws IOException, InterruptedException {
- // given
- String streamId = "sampleStream";
- given(cambriaPublisher.close(anyLong(), any(TimeUnit.class))).willThrow(IOException.class);
-
- // when
- CambriaBatchingPublisher cachedPublisher = cut.getByStreamId(streamId);
- cut.closeByStreamId(streamId);
-
- // then
- assertSame("should return proper publisher", cambriaPublisher, cachedPublisher);
- verify(cambriaPublisher).close(20, TimeUnit.SECONDS);
- }
-
- private Matcher<Exception> causeIsInstanceOf(final Class<?> clazz) {
- return new BaseMatcher<Exception>() {
- @Override
- public boolean matches(Object o) {
- return o instanceof Throwable && clazz.isInstance(((Throwable) o).getCause());
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText("exception cause should be an instance of " + clazz.getName());
- }
- };
- }
-} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java
index 973ee014..e211c12a 100644
--- a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java
+++ b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java
@@ -20,17 +20,17 @@
*/
package org.onap.dcae.commonFunction;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
import com.google.gson.Gson;
+import java.util.concurrent.atomic.AtomicReference;
import org.json.JSONObject;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.util.List;
+import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -52,7 +52,7 @@ public class EventProcessorTest {
@Test
public void testLoad() {
//given
- EventProcessor ec = new EventProcessor();
+ EventProcessor ec = new EventProcessor(mock(EventPublisher.class));
ec.event = new org.json.JSONObject(ev);
//when
ec.overrideEvent();
@@ -65,7 +65,7 @@ public class EventProcessorTest {
@Test
public void shouldParseJsonEvents() throws ReflectiveOperationException {
//given
- EventProcessor eventProcessor = new EventProcessor();
+ EventProcessor eventProcessor = new EventProcessor(mock(EventPublisher.class));
String event_json = "[{ \"filter\": {\"event.commonEventHeader.domain\":\"heartbeat\",\"VESversion\":\"v4\"},\"processors\":[" +
"{\"functionName\": \"concatenateValue\",\"args\":{\"field\":\"event.commonEventHeader.eventName\",\"concatenate\": [\"$event.commonEventHeader.domain\",\"$event.commonEventHeader.eventType\",\"$event.faultFields.alarmCondition\"], \"delimiter\":\"_\"}}" +
",{\"functionName\": \"addAttribute\",\"args\":{\"field\": \"event.heartbeatFields.heartbeatFieldsVersion\",\"value\": \"1.0\",\"fieldType\": \"number\"}}" +
@@ -84,32 +84,5 @@ public class EventProcessorTest {
assertThat(stringArgumentCaptor.getAllValues()).contains("concatenateValue", "addAttribute", "map");
}
- @Test
- public void shouldCreateDmaapPublisher() {
-
- //given
- EventPublisherHash eph = EventPublisherHash.getInstance();
- EventProcessor ec = new EventProcessor();
- ec.event = new org.json.JSONObject(ev);
- CommonStartup.cambriaConfigFile = "src/test/resources/testDmaapConfig_ip.json";
-
- //when
- CambriaBatchingPublisher pub = eph.getDmaapPublisher("sec_fault_ueb");
-
- //then
- assertNotNull(pub);
- }
-
- @Test
- public void shouldSendEventWithNoError() {
-
- EventPublisherHash eph = EventPublisherHash.getInstance();
- EventProcessor eventProcessor = new EventProcessor();
- eventProcessor.event = new org.json.JSONObject(ev);
- CommonStartup.cambriaConfigFile = "src/test/resources/testDmaapConfig_ip.json";
-
- //when
- eph.sendEvent(eventProcessor.event, "sec_fault_ueb");
- }
}
diff --git a/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java b/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java
index e0fd5a42..12428024 100644
--- a/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java
+++ b/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java
@@ -22,6 +22,7 @@ package org.onap.dcae.commonFunction;
import static java.util.Base64.getDecoder;
import static java.util.Base64.getEncoder;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.att.nsa.cmdLine.NsaCommandLineUtil;
@@ -43,6 +44,7 @@ import org.json.JSONObject;
import org.junit.Test;
import org.mockito.Mockito;
import org.onap.dcae.commonFunction.CommonStartup.QueueFullException;
+import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
import org.onap.dcae.restapi.RestfulCollectorServlet;
@@ -79,7 +81,7 @@ public class TestCommonStartup {
public void testParseStreamIdToStreamHashMapping() {
// given
CommonStartup.streamID = "fault=sec_fault|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert|voiceQuality=ves_voicequality|sipSignaling=ves_sipsignaling";
- EventProcessor eventProcessor = new EventProcessor();
+ EventProcessor eventProcessor = new EventProcessor(mock(EventPublisher.class));
// when
Map<String, String[]> streamHashMapping = EventProcessor.streamidHash;
diff --git a/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java
new file mode 100644
index 00000000..5a94c662
--- /dev/null
+++ b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java
@@ -0,0 +1,114 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction.event.publishing;
+
+import static io.vavr.API.List;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcae.commonFunction.event.publishing.DMaaPConfigurationParser.parseToDomainMapping;
+
+import io.vavr.collection.Map;
+import io.vavr.control.Try;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.junit.Test;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+public class DMaaPConfigurationParserTest {
+
+ @Test
+ public void testParseCredentialsForGen2() {
+ Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsGen2.json");
+ Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path);
+
+ PublisherConfig authCredentialsNulls = publisherConfigs.get().get("auth-credentials-null").getOrNull();
+ assertThat(authCredentialsNulls.userName().isEmpty()).isTrue();
+ assertThat(authCredentialsNulls.password().isEmpty()).isTrue();
+ assertThat(authCredentialsNulls.isSecured()).isFalse();
+
+ PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull();
+ assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser");
+ assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword");
+ assertThat(authCredentialsPresent.isSecured()).isTrue();
+
+ PublisherConfig authCredentialsKeysMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull();
+ assertThat(authCredentialsKeysMissing.userName().isEmpty()).isTrue();
+ assertThat(authCredentialsKeysMissing.password().isEmpty()).isTrue();
+ assertThat(authCredentialsKeysMissing.isSecured()).isFalse();
+ }
+
+
+ @Test
+ public void testParseCredentialsForLegacy() {
+ Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsLegacy.json");
+ Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path);
+
+ PublisherConfig authCredentialsNull = publisherConfigs.get().get("auth-credentials-null").getOrNull();
+ assertThat(authCredentialsNull.userName().isEmpty()).isTrue();
+ assertThat(authCredentialsNull.password().isEmpty()).isTrue();
+ assertThat(authCredentialsNull.isSecured()).isFalse();
+
+ PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull();
+ assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser");
+ assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword");
+ assertThat(authCredentialsPresent.isSecured()).isTrue();
+
+ PublisherConfig authCredentialsMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull();
+ assertThat(authCredentialsMissing.userName().isEmpty()).isTrue();
+ assertThat(authCredentialsMissing.password().isEmpty()).isTrue();
+ assertThat(authCredentialsMissing.isSecured()).isFalse();
+ }
+
+
+ @Test
+ public void testParseGen2() {
+ Path path = Paths.get("src/test/resources/testParseDMaaPGen2.json");
+ Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path);
+
+ PublisherConfig withEventsSegment = publisherConfigs.get().get("event-segments-with-port").getOrNull();
+ assertThat(withEventsSegment.destinations()).isEqualTo(List("UEBHOST:3904"));
+ assertThat(withEventsSegment.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
+
+ PublisherConfig withOtherSegment = publisherConfigs.get().get("other-segments-without-ports").getOrNull();
+ assertThat(withOtherSegment.destinations()).isEqualTo(List("UEBHOST"));
+ assertThat(withOtherSegment.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
+ }
+
+ @Test
+ public void testParseLegacy() {
+ Path exemplaryConfig = Paths.get("src/test/resources/testParseDMaaPLegacy.json");
+ Try<Map<String, PublisherConfig>> publisherConfigs = DMaaPConfigurationParser
+ .parseToDomainMapping(exemplaryConfig);
+
+ PublisherConfig urlFirstThenHosts = publisherConfigs.get().get("url-precedes-hosts").getOrNull();
+ assertThat(urlFirstThenHosts.destinations()).isEqualTo(List("127.0.0.1:3904"));
+ assertThat(urlFirstThenHosts.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
+
+ PublisherConfig urlKeyMissing = publisherConfigs.get().get("url-key-missing").getOrNull();
+ assertThat(urlKeyMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com"));
+ assertThat(urlKeyMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
+
+ PublisherConfig urlIsMissing = publisherConfigs.get().get("url-is-null").getOrNull();
+ assertThat(urlIsMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com"));
+ assertThat(urlIsMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
+ }
+} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisherTest.java
index 81c6556b..bbe5079e 100644
--- a/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java
+++ b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisherTest.java
@@ -17,74 +17,73 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.commonFunction;
+package org.onap.dcae.commonFunction.event.publishing;
+import static io.vavr.API.Option;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import com.att.nsa.cambria.client.CambriaBatchingPublisher;
import java.io.IOException;
import org.json.JSONObject;
import org.junit.Before;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.Logger;
-@RunWith(MockitoJUnitRunner.class)
-public class EventPublisherHashTest {
- private EventPublisherHash cut;
+public class DMaaPEventPublisherTest {
- @Mock
- private DmaapPublishers dmaapPublishers;
- @Mock
+ private static final String STREAM_ID = "sampleStreamId";
+
+ private DMaaPEventPublisher eventPublisher;
private CambriaBatchingPublisher cambriaPublisher;
+ private DMaaPPublishersCache DMaaPPublishersCache;
@Before
public void setUp() {
- given(dmaapPublishers.getByStreamId(anyString())).willReturn(cambriaPublisher);
-
- cut = new EventPublisherHash(dmaapPublishers);
+ cambriaPublisher = mock(CambriaBatchingPublisher.class);
+ DMaaPPublishersCache = mock(DMaaPPublishersCache.class);
+ when(DMaaPPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher));
+ eventPublisher = new DMaaPEventPublisher(DMaaPPublishersCache, mock(Logger.class));
}
@Test
- public void sendEventShouldSendEventToATopic() throws Exception {
+ public void shouldSendEventToTopic() throws Exception {
// given
JSONObject event = new JSONObject("{}");
- final String streamId = "sampleStreamId";
// when
- cut.sendEvent(event, streamId);
+ eventPublisher.sendEvent(event, STREAM_ID);
// then
verify(cambriaPublisher).send("MyPartitionKey", event.toString());
}
@Test
- public void sendEventShouldRemoveUuid() throws Exception {
+ public void shouldRemoveInternalVESUIDBeforeSending() throws Exception {
// given
- JSONObject event = new JSONObject("{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\", \"another\": 8}");
- final String streamId = "sampleStreamId";
+ JSONObject event = new JSONObject(
+ "{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\", \"another\": 8}");
// when
- cut.sendEvent(event, streamId);
+ eventPublisher.sendEvent(event, STREAM_ID);
// then
verify(cambriaPublisher).send("MyPartitionKey", new JSONObject("{\"another\": 8}").toString());
}
@Test
- public void sendEventShouldCloseConnectionWhenExceptionOccurred() throws Exception {
+ public void shouldCloseConnectionWhenExceptionOccurred() throws Exception {
// given
JSONObject event = new JSONObject("{}");
- final String streamId = "sampleStreamId";
given(cambriaPublisher.send(anyString(), anyString())).willThrow(new IOException("epic fail"));
// when
- cut.sendEvent(event, streamId);
+ eventPublisher.sendEvent(event, STREAM_ID);
// then
- verify(dmaapPublishers).closeByStreamId(streamId);
+ verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID);
}
} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java
new file mode 100644
index 00000000..8dc69f62
--- /dev/null
+++ b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java
@@ -0,0 +1,126 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction.event.publishing;
+
+import static io.vavr.API.List;
+import static io.vavr.API.Map;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.dcae.commonFunction.event.publishing.DMaaPPublishersCache.CambriaPublishersCacheLoader;
+import org.onap.dcae.commonFunction.event.publishing.DMaaPPublishersCache.OnPublisherRemovalListener;
+
+
+public class DMaaPPublishersCacheTest {
+
+ private String streamId1;
+ private Map<String, PublisherConfig> dMaaPConfigs;
+
+ @Before
+ public void setUp() {
+ streamId1 = "sampleStream1";
+ dMaaPConfigs = Map("sampleStream1", new PublisherConfig(List("destination1"), "topic1"));
+ }
+
+ @Test
+ public void shouldReturnTheSameCachedInstanceOnConsecutiveRetrievals() {
+ // given
+ DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs);
+
+ // when
+ Option<CambriaBatchingPublisher> firstPublisher = dMaaPPublishersCache.getPublisher(streamId1);
+ Option<CambriaBatchingPublisher> secondPublisher = dMaaPPublishersCache.getPublisher(streamId1);
+
+ // then
+ assertSame("should return same instance", firstPublisher.get(), secondPublisher.get());
+ }
+
+ @Test
+ public void shouldCloseCambriaPublisherOnCacheInvalidate() throws IOException, InterruptedException {
+ // given
+ CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class);
+ CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class);
+ DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock,
+ new OnPublisherRemovalListener(),
+ dMaaPConfigs);
+ when(cacheLoaderMock.load(streamId1)).thenReturn(cambriaPublisherMock1);
+
+ // when
+ dMaaPPublishersCache.getPublisher(streamId1);
+ dMaaPPublishersCache.closePublisherFor(streamId1);
+
+ // then
+ verify(cambriaPublisherMock1).close(20, TimeUnit.SECONDS);
+
+ }
+
+ @Test
+ public void shouldReturnNoneIfThereIsNoDMaaPConfigurationForGivenStreamID() {
+ // given
+ DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs);
+
+ // then
+ assertTrue("should not exist", dMaaPPublishersCache.getPublisher("non-existing").isEmpty());
+ }
+
+
+ @Test
+ public void shouldCloseOnlyChangedPublishers() throws IOException, InterruptedException {
+ // given
+ CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class);
+ CambriaBatchingPublisher cambriaPublisherMock2 = mock(CambriaBatchingPublisher.class);
+ CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class);
+ String firstDomain = "domain1";
+ String secondDomain = "domain2";
+ Map<String, PublisherConfig> oldConfig = Map(firstDomain,
+ new PublisherConfig(List("destination1"), "topic1"),
+ secondDomain,
+ new PublisherConfig(List("destination2"), "topic2",
+ "user", "pass"));
+ Map<String, PublisherConfig> newConfig = Map(firstDomain, new PublisherConfig(List("destination1"), "topic1"),
+ secondDomain, new PublisherConfig(List("destination2"), "topic2"));
+ DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock,
+ new OnPublisherRemovalListener(),
+ oldConfig);
+ when(cacheLoaderMock.load(firstDomain)).thenReturn(cambriaPublisherMock1);
+ when(cacheLoaderMock.load(secondDomain)).thenReturn(cambriaPublisherMock2);
+
+ dMaaPPublishersCache.getPublisher(firstDomain);
+ dMaaPPublishersCache.getPublisher(secondDomain);
+
+ // when
+ dMaaPPublishersCache.reconfigure(newConfig);
+
+ // then
+ verify(cambriaPublisherMock2).close(20, TimeUnit.SECONDS);
+ verifyZeroInteractions(cambriaPublisherMock1);
+ }
+} \ No newline at end of file