diff options
Diffstat (limited to 'src/test/java/org/onap/dcae/commonFunction')
-rw-r--r-- | src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java | 144 | ||||
-rw-r--r-- | src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java | 35 | ||||
-rw-r--r-- | src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java | 4 | ||||
-rw-r--r-- | src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java | 114 | ||||
-rw-r--r-- | src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisherTest.java (renamed from src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java) | 47 | ||||
-rw-r--r-- | src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java | 126 |
6 files changed, 270 insertions, 200 deletions
diff --git a/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java b/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java deleted file mode 100644 index f4955ac8..00000000 --- a/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.commonFunction; - -import static org.hamcrest.CoreMatchers.allOf; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.junit.Assert.assertSame; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaPublisher; -import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.UncheckedExecutionException; -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.concurrent.TimeUnit; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - - -@RunWith(MockitoJUnitRunner.class) -public class DmaapPublishersTest { - - @Mock - private CambriaPublisherFactory publisherFactory; - @Mock - private CambriaBatchingPublisher cambriaPublisher; - private DmaapPublishers cut; - - @Rule - public final ExpectedException expectedException = ExpectedException.none(); - - @Before - public void setUp() throws MalformedURLException, GeneralSecurityException { - given(publisherFactory.createCambriaPublisher(anyString())).willReturn(cambriaPublisher); - cut = DmaapPublishers.create(publisherFactory); - } - - @Test - public void getByStreamIdShouldUseCachedItem() throws IOException, GeneralSecurityException { - // given - String streamId = "sampleStream"; - - // when - CambriaBatchingPublisher firstPublisher = cut.getByStreamId(streamId); - CambriaBatchingPublisher secondPublisher = cut.getByStreamId(streamId); - - // then - verify(publisherFactory, times(1)).createCambriaPublisher(streamId); - assertSame("should return same instance", firstPublisher, secondPublisher); - } - - @Test - public void getByStreamIdShouldHandleErrors() throws MalformedURLException, GeneralSecurityException { - // given - MalformedURLException exception = new MalformedURLException(); - given(publisherFactory.createCambriaPublisher(anyString())).willThrow(exception); - expectedException.expect(allOf( - instanceOf(UncheckedExecutionException.class), - causeIsInstanceOf(exception.getClass()))); - - // when - cut.getByStreamId("a stream"); - - // then - // exception should have been thrown - } - - @Test - public void closeByStreamIdShouldCloseConnection() throws IOException, InterruptedException { - // given - String streamId = "sampleStream"; - given(cambriaPublisher.close(anyLong(), any(TimeUnit.class))) - .willReturn(ImmutableList.of(new CambriaPublisher.message("p", "msg"))); - - // when - CambriaBatchingPublisher cachedPublisher = cut.getByStreamId(streamId); - cut.closeByStreamId(streamId); - - // then - assertSame("should return proper publisher", cambriaPublisher, cachedPublisher); - verify(cambriaPublisher).close(20, TimeUnit.SECONDS); - } - - @Test - public void closeByStreamIdShouldHandleErrors() throws IOException, InterruptedException { - // given - String streamId = "sampleStream"; - given(cambriaPublisher.close(anyLong(), any(TimeUnit.class))).willThrow(IOException.class); - - // when - CambriaBatchingPublisher cachedPublisher = cut.getByStreamId(streamId); - cut.closeByStreamId(streamId); - - // then - assertSame("should return proper publisher", cambriaPublisher, cachedPublisher); - verify(cambriaPublisher).close(20, TimeUnit.SECONDS); - } - - private Matcher<Exception> causeIsInstanceOf(final Class<?> clazz) { - return new BaseMatcher<Exception>() { - @Override - public boolean matches(Object o) { - return o instanceof Throwable && clazz.isInstance(((Throwable) o).getCause()); - } - - @Override - public void describeTo(Description description) { - description.appendText("exception cause should be an instance of " + clazz.getName()); - } - }; - } -}
\ No newline at end of file diff --git a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java index 973ee014..e211c12a 100644 --- a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java +++ b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java @@ -20,17 +20,17 @@ */ package org.onap.dcae.commonFunction; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; import com.google.gson.Gson; +import java.util.concurrent.atomic.AtomicReference; import org.json.JSONObject; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import java.util.List; +import org.onap.dcae.commonFunction.event.publishing.EventPublisher; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -52,7 +52,7 @@ public class EventProcessorTest { @Test public void testLoad() { //given - EventProcessor ec = new EventProcessor(); + EventProcessor ec = new EventProcessor(mock(EventPublisher.class)); ec.event = new org.json.JSONObject(ev); //when ec.overrideEvent(); @@ -65,7 +65,7 @@ public class EventProcessorTest { @Test public void shouldParseJsonEvents() throws ReflectiveOperationException { //given - EventProcessor eventProcessor = new EventProcessor(); + EventProcessor eventProcessor = new EventProcessor(mock(EventPublisher.class)); String event_json = "[{ \"filter\": {\"event.commonEventHeader.domain\":\"heartbeat\",\"VESversion\":\"v4\"},\"processors\":[" + "{\"functionName\": \"concatenateValue\",\"args\":{\"field\":\"event.commonEventHeader.eventName\",\"concatenate\": [\"$event.commonEventHeader.domain\",\"$event.commonEventHeader.eventType\",\"$event.faultFields.alarmCondition\"], \"delimiter\":\"_\"}}" + ",{\"functionName\": \"addAttribute\",\"args\":{\"field\": \"event.heartbeatFields.heartbeatFieldsVersion\",\"value\": \"1.0\",\"fieldType\": \"number\"}}" + @@ -84,32 +84,5 @@ public class EventProcessorTest { assertThat(stringArgumentCaptor.getAllValues()).contains("concatenateValue", "addAttribute", "map"); } - @Test - public void shouldCreateDmaapPublisher() { - - //given - EventPublisherHash eph = EventPublisherHash.getInstance(); - EventProcessor ec = new EventProcessor(); - ec.event = new org.json.JSONObject(ev); - CommonStartup.cambriaConfigFile = "src/test/resources/testDmaapConfig_ip.json"; - - //when - CambriaBatchingPublisher pub = eph.getDmaapPublisher("sec_fault_ueb"); - - //then - assertNotNull(pub); - } - - @Test - public void shouldSendEventWithNoError() { - - EventPublisherHash eph = EventPublisherHash.getInstance(); - EventProcessor eventProcessor = new EventProcessor(); - eventProcessor.event = new org.json.JSONObject(ev); - CommonStartup.cambriaConfigFile = "src/test/resources/testDmaapConfig_ip.json"; - - //when - eph.sendEvent(eventProcessor.event, "sec_fault_ueb"); - } } diff --git a/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java b/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java index e0fd5a42..12428024 100644 --- a/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java +++ b/src/test/java/org/onap/dcae/commonFunction/TestCommonStartup.java @@ -22,6 +22,7 @@ package org.onap.dcae.commonFunction; import static java.util.Base64.getDecoder; import static java.util.Base64.getEncoder; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.att.nsa.cmdLine.NsaCommandLineUtil; @@ -43,6 +44,7 @@ import org.json.JSONObject; import org.junit.Test; import org.mockito.Mockito; import org.onap.dcae.commonFunction.CommonStartup.QueueFullException; +import org.onap.dcae.commonFunction.event.publishing.EventPublisher; import org.onap.dcae.restapi.RestfulCollectorServlet; @@ -79,7 +81,7 @@ public class TestCommonStartup { public void testParseStreamIdToStreamHashMapping() { // given CommonStartup.streamID = "fault=sec_fault|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert|voiceQuality=ves_voicequality|sipSignaling=ves_sipsignaling"; - EventProcessor eventProcessor = new EventProcessor(); + EventProcessor eventProcessor = new EventProcessor(mock(EventPublisher.class)); // when Map<String, String[]> streamHashMapping = EventProcessor.streamidHash; diff --git a/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java new file mode 100644 index 00000000..5a94c662 --- /dev/null +++ b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPConfigurationParserTest.java @@ -0,0 +1,114 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import static io.vavr.API.List; +import static org.assertj.core.api.Assertions.assertThat; +import static org.onap.dcae.commonFunction.event.publishing.DMaaPConfigurationParser.parseToDomainMapping; + +import io.vavr.collection.Map; +import io.vavr.control.Try; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.junit.Test; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +public class DMaaPConfigurationParserTest { + + @Test + public void testParseCredentialsForGen2() { + Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsGen2.json"); + Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path); + + PublisherConfig authCredentialsNulls = publisherConfigs.get().get("auth-credentials-null").getOrNull(); + assertThat(authCredentialsNulls.userName().isEmpty()).isTrue(); + assertThat(authCredentialsNulls.password().isEmpty()).isTrue(); + assertThat(authCredentialsNulls.isSecured()).isFalse(); + + PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull(); + assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser"); + assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword"); + assertThat(authCredentialsPresent.isSecured()).isTrue(); + + PublisherConfig authCredentialsKeysMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull(); + assertThat(authCredentialsKeysMissing.userName().isEmpty()).isTrue(); + assertThat(authCredentialsKeysMissing.password().isEmpty()).isTrue(); + assertThat(authCredentialsKeysMissing.isSecured()).isFalse(); + } + + + @Test + public void testParseCredentialsForLegacy() { + Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsLegacy.json"); + Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path); + + PublisherConfig authCredentialsNull = publisherConfigs.get().get("auth-credentials-null").getOrNull(); + assertThat(authCredentialsNull.userName().isEmpty()).isTrue(); + assertThat(authCredentialsNull.password().isEmpty()).isTrue(); + assertThat(authCredentialsNull.isSecured()).isFalse(); + + PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull(); + assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser"); + assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword"); + assertThat(authCredentialsPresent.isSecured()).isTrue(); + + PublisherConfig authCredentialsMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull(); + assertThat(authCredentialsMissing.userName().isEmpty()).isTrue(); + assertThat(authCredentialsMissing.password().isEmpty()).isTrue(); + assertThat(authCredentialsMissing.isSecured()).isFalse(); + } + + + @Test + public void testParseGen2() { + Path path = Paths.get("src/test/resources/testParseDMaaPGen2.json"); + Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path); + + PublisherConfig withEventsSegment = publisherConfigs.get().get("event-segments-with-port").getOrNull(); + assertThat(withEventsSegment.destinations()).isEqualTo(List("UEBHOST:3904")); + assertThat(withEventsSegment.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); + + PublisherConfig withOtherSegment = publisherConfigs.get().get("other-segments-without-ports").getOrNull(); + assertThat(withOtherSegment.destinations()).isEqualTo(List("UEBHOST")); + assertThat(withOtherSegment.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); + } + + @Test + public void testParseLegacy() { + Path exemplaryConfig = Paths.get("src/test/resources/testParseDMaaPLegacy.json"); + Try<Map<String, PublisherConfig>> publisherConfigs = DMaaPConfigurationParser + .parseToDomainMapping(exemplaryConfig); + + PublisherConfig urlFirstThenHosts = publisherConfigs.get().get("url-precedes-hosts").getOrNull(); + assertThat(urlFirstThenHosts.destinations()).isEqualTo(List("127.0.0.1:3904")); + assertThat(urlFirstThenHosts.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); + + PublisherConfig urlKeyMissing = publisherConfigs.get().get("url-key-missing").getOrNull(); + assertThat(urlKeyMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com")); + assertThat(urlKeyMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); + + PublisherConfig urlIsMissing = publisherConfigs.get().get("url-is-null").getOrNull(); + assertThat(urlIsMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com")); + assertThat(urlIsMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV"); + } +}
\ No newline at end of file diff --git a/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisherTest.java index 81c6556b..bbe5079e 100644 --- a/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java +++ b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPEventPublisherTest.java @@ -17,74 +17,73 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.commonFunction; +package org.onap.dcae.commonFunction.event.publishing; +import static io.vavr.API.Option; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.att.nsa.cambria.client.CambriaBatchingPublisher; import java.io.IOException; import org.json.JSONObject; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; -@RunWith(MockitoJUnitRunner.class) -public class EventPublisherHashTest { - private EventPublisherHash cut; +public class DMaaPEventPublisherTest { - @Mock - private DmaapPublishers dmaapPublishers; - @Mock + private static final String STREAM_ID = "sampleStreamId"; + + private DMaaPEventPublisher eventPublisher; private CambriaBatchingPublisher cambriaPublisher; + private DMaaPPublishersCache DMaaPPublishersCache; @Before public void setUp() { - given(dmaapPublishers.getByStreamId(anyString())).willReturn(cambriaPublisher); - - cut = new EventPublisherHash(dmaapPublishers); + cambriaPublisher = mock(CambriaBatchingPublisher.class); + DMaaPPublishersCache = mock(DMaaPPublishersCache.class); + when(DMaaPPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher)); + eventPublisher = new DMaaPEventPublisher(DMaaPPublishersCache, mock(Logger.class)); } @Test - public void sendEventShouldSendEventToATopic() throws Exception { + public void shouldSendEventToTopic() throws Exception { // given JSONObject event = new JSONObject("{}"); - final String streamId = "sampleStreamId"; // when - cut.sendEvent(event, streamId); + eventPublisher.sendEvent(event, STREAM_ID); // then verify(cambriaPublisher).send("MyPartitionKey", event.toString()); } @Test - public void sendEventShouldRemoveUuid() throws Exception { + public void shouldRemoveInternalVESUIDBeforeSending() throws Exception { // given - JSONObject event = new JSONObject("{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\", \"another\": 8}"); - final String streamId = "sampleStreamId"; + JSONObject event = new JSONObject( + "{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\", \"another\": 8}"); // when - cut.sendEvent(event, streamId); + eventPublisher.sendEvent(event, STREAM_ID); // then verify(cambriaPublisher).send("MyPartitionKey", new JSONObject("{\"another\": 8}").toString()); } @Test - public void sendEventShouldCloseConnectionWhenExceptionOccurred() throws Exception { + public void shouldCloseConnectionWhenExceptionOccurred() throws Exception { // given JSONObject event = new JSONObject("{}"); - final String streamId = "sampleStreamId"; given(cambriaPublisher.send(anyString(), anyString())).willThrow(new IOException("epic fail")); // when - cut.sendEvent(event, streamId); + eventPublisher.sendEvent(event, STREAM_ID); // then - verify(dmaapPublishers).closeByStreamId(streamId); + verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID); } }
\ No newline at end of file diff --git a/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java new file mode 100644 index 00000000..8dc69f62 --- /dev/null +++ b/src/test/java/org/onap/dcae/commonFunction/event/publishing/DMaaPPublishersCacheTest.java @@ -0,0 +1,126 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction.event.publishing; + +import static io.vavr.API.List; +import static io.vavr.API.Map; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import io.vavr.collection.Map; +import io.vavr.control.Option; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import org.onap.dcae.commonFunction.event.publishing.DMaaPPublishersCache.CambriaPublishersCacheLoader; +import org.onap.dcae.commonFunction.event.publishing.DMaaPPublishersCache.OnPublisherRemovalListener; + + +public class DMaaPPublishersCacheTest { + + private String streamId1; + private Map<String, PublisherConfig> dMaaPConfigs; + + @Before + public void setUp() { + streamId1 = "sampleStream1"; + dMaaPConfigs = Map("sampleStream1", new PublisherConfig(List("destination1"), "topic1")); + } + + @Test + public void shouldReturnTheSameCachedInstanceOnConsecutiveRetrievals() { + // given + DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs); + + // when + Option<CambriaBatchingPublisher> firstPublisher = dMaaPPublishersCache.getPublisher(streamId1); + Option<CambriaBatchingPublisher> secondPublisher = dMaaPPublishersCache.getPublisher(streamId1); + + // then + assertSame("should return same instance", firstPublisher.get(), secondPublisher.get()); + } + + @Test + public void shouldCloseCambriaPublisherOnCacheInvalidate() throws IOException, InterruptedException { + // given + CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class); + CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class); + DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock, + new OnPublisherRemovalListener(), + dMaaPConfigs); + when(cacheLoaderMock.load(streamId1)).thenReturn(cambriaPublisherMock1); + + // when + dMaaPPublishersCache.getPublisher(streamId1); + dMaaPPublishersCache.closePublisherFor(streamId1); + + // then + verify(cambriaPublisherMock1).close(20, TimeUnit.SECONDS); + + } + + @Test + public void shouldReturnNoneIfThereIsNoDMaaPConfigurationForGivenStreamID() { + // given + DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs); + + // then + assertTrue("should not exist", dMaaPPublishersCache.getPublisher("non-existing").isEmpty()); + } + + + @Test + public void shouldCloseOnlyChangedPublishers() throws IOException, InterruptedException { + // given + CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class); + CambriaBatchingPublisher cambriaPublisherMock2 = mock(CambriaBatchingPublisher.class); + CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class); + String firstDomain = "domain1"; + String secondDomain = "domain2"; + Map<String, PublisherConfig> oldConfig = Map(firstDomain, + new PublisherConfig(List("destination1"), "topic1"), + secondDomain, + new PublisherConfig(List("destination2"), "topic2", + "user", "pass")); + Map<String, PublisherConfig> newConfig = Map(firstDomain, new PublisherConfig(List("destination1"), "topic1"), + secondDomain, new PublisherConfig(List("destination2"), "topic2")); + DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock, + new OnPublisherRemovalListener(), + oldConfig); + when(cacheLoaderMock.load(firstDomain)).thenReturn(cambriaPublisherMock1); + when(cacheLoaderMock.load(secondDomain)).thenReturn(cambriaPublisherMock2); + + dMaaPPublishersCache.getPublisher(firstDomain); + dMaaPPublishersCache.getPublisher(secondDomain); + + // when + dMaaPPublishersCache.reconfigure(newConfig); + + // then + verify(cambriaPublisherMock2).close(20, TimeUnit.SECONDS); + verifyZeroInteractions(cambriaPublisherMock1); + } +}
\ No newline at end of file |