aboutsummaryrefslogtreecommitdiffstats
path: root/src/test/java/org/onap/dcae/common
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/java/org/onap/dcae/common')
-rw-r--r--src/test/java/org/onap/dcae/common/EventSenderTest.java35
-rw-r--r--src/test/java/org/onap/dcae/common/publishing/DMaaPConfigurationParserTest.java45
-rw-r--r--src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java126
-rw-r--r--src/test/java/org/onap/dcae/common/publishing/DMaaPPublishersCacheTest.java126
-rw-r--r--src/test/java/org/onap/dcae/common/publishing/DMaapContainer.java54
-rw-r--r--src/test/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapperTest.java116
-rw-r--r--src/test/java/org/onap/dcae/common/publishing/PublisherTest.java78
-rw-r--r--src/test/java/org/onap/dcae/common/publishing/PublisherTestMockServer.java156
-rw-r--r--src/test/java/org/onap/dcae/common/validator/BatchEventValidatorTest.java110
9 files changed, 528 insertions, 318 deletions
diff --git a/src/test/java/org/onap/dcae/common/EventSenderTest.java b/src/test/java/org/onap/dcae/common/EventSenderTest.java
index 454cfb52..6d508d0a 100644
--- a/src/test/java/org/onap/dcae/common/EventSenderTest.java
+++ b/src/test/java/org/onap/dcae/common/EventSenderTest.java
@@ -1,9 +1,9 @@
/*
* ============LICENSE_START=======================================================
- * PROJECT
+ * VES Collector
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018,2020 Nokia. All rights reserved.
+ * Copyright (C) 2018-2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.onap.dcae.common.model.StndDefinedNamespaceParameterNotDefinedException;
import org.onap.dcae.common.model.VesEvent;
import org.onap.dcae.common.publishing.DMaaPEventPublisher;
+import org.onap.dcae.restapi.EventValidatorException;
import java.io.IOException;
import java.util.List;
@@ -53,31 +54,18 @@ public class EventSenderTest {
List<VesEvent> eventToSend = createEventToSend("/eventsAfterTransformation/ves7_valid_event.json");
// when
- eventSender.send(eventToSend);
+ assertThatExceptionOfType(EventValidatorException.class)
+ .isThrownBy(() -> eventSender.send(eventToSend));
// then
verifyThatEventWasNotSendAtStream();
}
@Test
- public void shouldSendEventAtStreamsAssignedToEventDomain() throws IOException {
- // given
- EventSender eventSender = givenConfiguredEventSender(HashMap.of("fault", new String[]{"ves-fault", "fault-ves"}));
- List<VesEvent> eventToSend = createEventToSend("/eventsAfterTransformation/ves7_valid_event.json");
-
- // when
- eventSender.send(eventToSend);
-
- //then
- verifyThatEventWasSendAtStream("ves-fault");
- verifyThatEventWasSendAtStream("fault-ves");
- }
-
- @Test
public void shouldSendStdDefinedEventAtStreamAssignedToEventDomain() throws IOException {
// given
EventSender eventSender = givenConfiguredEventSender(
- HashMap.of("3GPP-FaultSupervision", new String[]{"ves-3gpp-fault-supervision"})
+ HashMap.of("3GPP-FaultSupervision", "ves-3gpp-fault-supervision")
);
List<VesEvent> eventToSend = createEventToSend("/eventsAfterTransformation/ves_stdnDefined_valid.json");
@@ -95,7 +83,8 @@ public class EventSenderTest {
List<VesEvent> eventToSend = createEventToSend("/eventsAfterTransformation/ves_stdnDefined_valid.json");
// when
- eventSender.send(eventToSend);
+ assertThatExceptionOfType(EventValidatorException.class)
+ .isThrownBy(() -> eventSender.send(eventToSend));
// then
verifyThatEventWasNotSendAtStream();
@@ -122,7 +111,7 @@ public class EventSenderTest {
return givenEventToSend(event);
}
- private EventSender givenConfiguredEventSender(io.vavr.collection.Map<String, String[]> streamIds) {
+ private EventSender givenConfiguredEventSender(io.vavr.collection.Map<String, String> streamIds) {
return new EventSender(eventPublisher, streamIds);
}
@@ -132,10 +121,10 @@ public class EventSenderTest {
}
private void verifyThatEventWasNotSendAtStream() {
- verify(eventPublisher,never()).sendEvent(any(),any());
+ verify(eventPublisher,never()).sendEvent(any(),any());
}
private void verifyThatEventWasSendAtStream(String s) {
- verify(eventPublisher).sendEvent(any(), eq(s));
- }
+ verify(eventPublisher).sendEvent(any(), eq(s));
+ }
}
diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPConfigurationParserTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPConfigurationParserTest.java
index 923aae02..9aaeb287 100644
--- a/src/test/java/org/onap/dcae/common/publishing/DMaaPConfigurationParserTest.java
+++ b/src/test/java/org/onap/dcae/common/publishing/DMaaPConfigurationParserTest.java
@@ -3,7 +3,7 @@
* org.onap.dcaegen2.collectors.ves
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018,2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -56,29 +56,6 @@ public class DMaaPConfigurationParserTest {
assertThat(authCredentialsKeysMissing.isSecured()).isFalse();
}
-
- @Test
- public void testParseCredentialsForLegacy() {
- Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsLegacy.json");
- Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path);
-
- PublisherConfig authCredentialsNull = publisherConfigs.get().get("auth-credentials-null").getOrNull();
- assertThat(authCredentialsNull.userName().isEmpty()).isTrue();
- assertThat(authCredentialsNull.password().isEmpty()).isTrue();
- assertThat(authCredentialsNull.isSecured()).isFalse();
-
- PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull();
- assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser");
- assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword");
- assertThat(authCredentialsPresent.isSecured()).isTrue();
-
- PublisherConfig authCredentialsMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull();
- assertThat(authCredentialsMissing.userName().isEmpty()).isTrue();
- assertThat(authCredentialsMissing.password().isEmpty()).isTrue();
- assertThat(authCredentialsMissing.isSecured()).isFalse();
- }
-
-
@Test
public void testParseGen2() {
Path path = Paths.get("src/test/resources/testParseDMaaPGen2.json");
@@ -93,22 +70,4 @@ public class DMaaPConfigurationParserTest {
assertThat(withOtherSegment.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
}
- @Test
- public void testParseLegacy() {
- Path exemplaryConfig = Paths.get("src/test/resources/testParseDMaaPLegacy.json");
- Try<Map<String, PublisherConfig>> publisherConfigs = DMaaPConfigurationParser
- .parseToDomainMapping(exemplaryConfig);
-
- PublisherConfig urlFirstThenHosts = publisherConfigs.get().get("url-precedes-hosts").getOrNull();
- assertThat(urlFirstThenHosts.destinations()).isEqualTo(List("127.0.0.1:3904"));
- assertThat(urlFirstThenHosts.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
-
- PublisherConfig urlKeyMissing = publisherConfigs.get().get("url-key-missing").getOrNull();
- assertThat(urlKeyMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com"));
- assertThat(urlKeyMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
-
- PublisherConfig urlIsMissing = publisherConfigs.get().get("url-is-null").getOrNull();
- assertThat(urlIsMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com"));
- assertThat(urlIsMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
- }
-} \ No newline at end of file
+}
diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java
deleted file mode 100644
index e4b6fd91..00000000
--- a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2018,2020 Nokia. All rights reserved.
- * Copyright (C) 2020 AT&T. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.common.publishing;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import org.json.JSONObject;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.dcae.common.model.VesEvent;
-
-import java.io.IOException;
-
-import static io.vavr.API.Option;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class DMaaPEventPublisherTest {
-
- private static final String STREAM_ID = "sampleStreamId";
-
- private static final JSONObject EXPECTED_EVENT =
- new JSONObject(
- "{\"VESversion\":\"v7\",\"event\":{"
- + "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,"
- + "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\","
- + "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\","
- + "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\","
- + "\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,"
- + "\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\","
- + "\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\","
- + "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}");
-
- private static final String PARTITION = "dns01cmd004";
-
- private DMaaPEventPublisher eventPublisher;
- private CambriaBatchingPublisher cambriaPublisher;
- private DMaaPPublishersCache DMaaPPublishersCache;
-
- @Before
- public void setUp() {
- cambriaPublisher = mock(CambriaBatchingPublisher.class);
- DMaaPPublishersCache = mock(DMaaPPublishersCache.class);
- when(DMaaPPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher));
- eventPublisher = new DMaaPEventPublisher(DMaaPPublishersCache);
- }
-
- @Test
- public void shouldSendEventToTopic() throws Exception {
- // when
- eventPublisher.sendEvent(givenVesEventWithoutVESuniqueIdField(), STREAM_ID);
-
- // then
- verify(cambriaPublisher).send(PARTITION, EXPECTED_EVENT.toString());
- }
-
- @Test
- public void shouldRemoveInternalVESUIDBeforeSending() throws Exception {
- // when
- eventPublisher.sendEvent(givenVesEventWithVESUniqueIdField(), STREAM_ID);
-
- // then
- verify(cambriaPublisher).send(PARTITION, EXPECTED_EVENT.toString());
- }
-
- @Test
- public void shouldCloseConnectionWhenExceptionOccurred() throws Exception {
- // given
- given(cambriaPublisher.send(anyString(), anyString()))
- .willThrow(new IOException("Expected exception - test case scenario!"));
-
- // when
- eventPublisher.sendEvent(givenVesEventWithVESUniqueIdField(), STREAM_ID);
-
- // then
- verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID);
- }
-
- private VesEvent givenVesEventWithVESUniqueIdField() {
- return new VesEvent(
- new JSONObject(
- "{\"VESversion\":\"v7\",\"VESuniqueId\":\"fd69d432-5cd5-4c15-9d34-407c81c61c6a-0\"," +
- "\"event\":{" +
- "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019," +
- "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\"," +
- "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\"," +
- "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\"," +
- "\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312," +
- "\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\"," +
- "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}"));
- }
-
- private VesEvent givenVesEventWithoutVESuniqueIdField() {
- return new VesEvent(
- new JSONObject(
- "{\"VESversion\":\"v7\"," +
- "\"event\":{" +
- "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019," +
- "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\"," +
- "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\"," +
- "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\"," +
- "\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312," +
- "\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\"," +
- "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}"));
- }
-}
diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPPublishersCacheTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPPublishersCacheTest.java
deleted file mode 100644
index f4dbe190..00000000
--- a/src/test/java/org/onap/dcae/common/publishing/DMaaPPublishersCacheTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.common.publishing;
-
-import static io.vavr.API.List;
-import static io.vavr.API.Map;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import io.vavr.collection.Map;
-import io.vavr.control.Option;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.dcae.common.publishing.DMaaPPublishersCache.CambriaPublishersCacheLoader;
-import org.onap.dcae.common.publishing.DMaaPPublishersCache.OnPublisherRemovalListener;
-
-
-public class DMaaPPublishersCacheTest {
-
- private String streamId1;
- private Map<String, PublisherConfig> dMaaPConfigs;
-
- @Before
- public void setUp() {
- streamId1 = "sampleStream1";
- dMaaPConfigs = Map("sampleStream1", new PublisherConfig(List("destination1"), "topic1"));
- }
-
- @Test
- public void shouldReturnTheSameCachedInstanceOnConsecutiveRetrievals() {
- // given
- DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs);
-
- // when
- Option<CambriaBatchingPublisher> firstPublisher = dMaaPPublishersCache.getPublisher(streamId1);
- Option<CambriaBatchingPublisher> secondPublisher = dMaaPPublishersCache.getPublisher(streamId1);
-
- // then
- assertSame("should return same instance", firstPublisher.get(), secondPublisher.get());
- }
-
- @Test
- public void shouldCloseCambriaPublisherOnCacheInvalidate() throws IOException, InterruptedException {
- // given
- CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class);
- CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class);
- DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock,
- new OnPublisherRemovalListener(),
- dMaaPConfigs);
- when(cacheLoaderMock.load(streamId1)).thenReturn(cambriaPublisherMock1);
-
- // when
- dMaaPPublishersCache.getPublisher(streamId1);
- dMaaPPublishersCache.closePublisherFor(streamId1);
-
- // then
- verify(cambriaPublisherMock1).close(20, TimeUnit.SECONDS);
-
- }
-
- @Test
- public void shouldReturnNoneIfThereIsNoDMaaPConfigurationForGivenStreamID() {
- // given
- DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs);
-
- // then
- assertTrue("should not exist", dMaaPPublishersCache.getPublisher("non-existing").isEmpty());
- }
-
-
- @Test
- public void shouldCloseOnlyChangedPublishers() throws IOException, InterruptedException {
- // given
- CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class);
- CambriaBatchingPublisher cambriaPublisherMock2 = mock(CambriaBatchingPublisher.class);
- CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class);
- String firstDomain = "domain1";
- String secondDomain = "domain2";
- Map<String, PublisherConfig> oldConfig = Map(firstDomain,
- new PublisherConfig(List("destination1"), "topic1"),
- secondDomain,
- new PublisherConfig(List("destination2"), "topic2",
- "user", "pass"));
- Map<String, PublisherConfig> newConfig = Map(firstDomain, new PublisherConfig(List("destination1"), "topic1"),
- secondDomain, new PublisherConfig(List("destination2"), "topic2"));
- DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock,
- new OnPublisherRemovalListener(),
- oldConfig);
- when(cacheLoaderMock.load(firstDomain)).thenReturn(cambriaPublisherMock1);
- when(cacheLoaderMock.load(secondDomain)).thenReturn(cambriaPublisherMock2);
-
- dMaaPPublishersCache.getPublisher(firstDomain);
- dMaaPPublishersCache.getPublisher(secondDomain);
-
- // when
- dMaaPPublishersCache.reconfigure(newConfig);
-
- // then
- verify(cambriaPublisherMock2).close(20, TimeUnit.SECONDS);
- verifyZeroInteractions(cambriaPublisherMock1);
- }
-} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaapContainer.java b/src/test/java/org/onap/dcae/common/publishing/DMaapContainer.java
new file mode 100644
index 00000000..9ece10b5
--- /dev/null
+++ b/src/test/java/org/onap/dcae/common/publishing/DMaapContainer.java
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START====================================
+ * VES Collector
+ * =========================================================
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcae.common.publishing;
+
+import org.testcontainers.containers.DockerComposeContainer;
+
+import java.io.File;
+import java.net.URL;
+
+final class DMaapContainer {
+ private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
+ private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(MR_COMPOSE_RESOURCE_NAME);
+ static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
+ static final String DMAAP_SERVICE_NAME = "onap-dmaap";
+
+ private DMaapContainer() {}
+
+
+ public static DockerComposeContainer createContainerInstance(){
+ return new DockerComposeContainer(
+ new File(DOCKER_COMPOSE_FILE_PATH))
+ .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT)
+ .withLocalCompose(true);
+ }
+
+
+
+ private static String getDockerComposeFilePath(String resourceName) {
+ URL resource = DMaapContainer.class.getClassLoader()
+ .getResource(resourceName);
+
+ if (resource != null) return resource.getFile();
+ else throw new RuntimeException(String
+ .format("File %s does not exist", resourceName));
+ }
+}
diff --git a/src/test/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapperTest.java b/src/test/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapperTest.java
new file mode 100644
index 00000000..0e5ae908
--- /dev/null
+++ b/src/test/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapperTest.java
@@ -0,0 +1,116 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.onap.dcae.common.model.BackwardsCompatibilityException;
+import org.onap.dcae.common.model.InternalException;
+import org.onap.dcae.common.model.PayloadToLargeException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.springframework.http.HttpStatus;
+
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.onap.dcae.ApplicationSettings.responseCompatibility;
+import static org.onap.dcae.common.publishing.MessageRouterHttpStatusMapper.getHttpStatus;
+
+class MessageRouterHttpStatusMapperTest {
+
+ public static final String BACKWARDS_COMPATIBILITY = "v7.2";
+ public static final String BACKWARDS_COMPATIBILITY_NONE = "NONE";
+
+ @Test
+ void ves_shouldResponse202() {
+ //given
+ responseCompatibility = BACKWARDS_COMPATIBILITY;
+ MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class);
+ when(messageRouterPublishResponse.successful()).thenReturn(true);
+
+ //when
+ HttpStatus httpStatusResponse = getHttpStatus(messageRouterPublishResponse);
+
+ //then
+ assertSame(HttpStatus.ACCEPTED, httpStatusResponse);
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = HttpStatus.class,
+ names = {"NOT_FOUND", "REQUEST_TIMEOUT", "TOO_MANY_REQUESTS", "INTERNAL_SERVER_ERROR", "BAD_GATEWAY",
+ "SERVICE_UNAVAILABLE", "GATEWAY_TIMEOUT","PAYLOAD_TOO_LARGE"}
+ )
+ void ves_shouldMapErrorsToBackwardsCompatibility(HttpStatus httpStatus) {
+ //given
+ responseCompatibility = BACKWARDS_COMPATIBILITY;
+ MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class);
+ when(messageRouterPublishResponse.failReason()).thenReturn(httpStatus.toString());
+
+ //when
+ //then
+ assertThrows(BackwardsCompatibilityException.class,()->getHttpStatus(messageRouterPublishResponse));
+ }
+
+ @Test
+ void ves_shouldResponse200WhenBackwardsCompatibilityIsNone() {
+ //given
+ responseCompatibility = BACKWARDS_COMPATIBILITY_NONE;
+ MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class);
+ when(messageRouterPublishResponse.successful()).thenReturn(true);
+
+ //when
+ HttpStatus httpStatusResponse = getHttpStatus(messageRouterPublishResponse);
+
+ //then
+ assertSame(HttpStatus.OK, httpStatusResponse);
+ }
+
+ @Test
+ void ves_shouldHandleError413WhenBackwardsCompatibilityIsNone() {
+ //given
+ responseCompatibility = BACKWARDS_COMPATIBILITY_NONE;
+ MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class);
+ when(messageRouterPublishResponse.failReason()).thenReturn(HttpStatus.PAYLOAD_TOO_LARGE.toString());
+
+ //when
+ //then
+ assertThrows(PayloadToLargeException.class,()->getHttpStatus(messageRouterPublishResponse));
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = HttpStatus.class,
+ names = {"NOT_FOUND", "REQUEST_TIMEOUT", "TOO_MANY_REQUESTS", "INTERNAL_SERVER_ERROR", "BAD_GATEWAY",
+ "SERVICE_UNAVAILABLE", "GATEWAY_TIMEOUT"}
+ )
+ void ves_shouldMapErrorsTo503WhenBackwardsCompatibilityIsNone(HttpStatus httpStatus) {
+ //given
+ responseCompatibility = BACKWARDS_COMPATIBILITY_NONE;
+ MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class);
+ when(messageRouterPublishResponse.failReason()).thenReturn(httpStatus.toString());
+
+ //when
+ //then
+ assertThrows(InternalException.class,()->getHttpStatus(messageRouterPublishResponse));
+ }
+}
diff --git a/src/test/java/org/onap/dcae/common/publishing/PublisherTest.java b/src/test/java/org/onap/dcae/common/publishing/PublisherTest.java
new file mode 100644
index 00000000..f269b942
--- /dev/null
+++ b/src/test/java/org/onap/dcae/common/publishing/PublisherTest.java
@@ -0,0 +1,78 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import com.google.gson.JsonElement;
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.time.Duration;
+
+import static org.onap.dcae.common.publishing.DMaapContainer.createContainerInstance;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.getAsJsonElements;
+
+
+@Testcontainers
+public class PublisherTest {
+
+ @Container
+ private final DockerComposeContainer CONTAINER = createContainerInstance();
+
+ @Test
+ void publishEvent_shouldSuccessfullyPublishSingleMessage() {
+ //given
+ final Publisher publisher = new Publisher();
+ final String simpleEvent = "{\"message\":\"message1\"}";
+ final List<String> twoJsonMessages = List.of(simpleEvent);
+ final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = publisher.publishEvents(twoJsonMessages, createPublishConfig());
+
+ //then
+ StepVerifier.create(result)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(Duration.ofSeconds(10));
+ }
+
+
+ private Option<PublisherConfig> createPublishConfig() {
+ List<String> desc = List.of("127.0.0.1:3904");
+ PublisherConfig conf = new PublisherConfig(desc, "topic");
+ return Option.of(conf);
+ }
+
+ private MessageRouterPublishResponse successPublishResponse(List<JsonElement> items) {
+ return ImmutableMessageRouterPublishResponse
+ .builder()
+ .items(items)
+ .build();
+ }
+
+}
diff --git a/src/test/java/org/onap/dcae/common/publishing/PublisherTestMockServer.java b/src/test/java/org/onap/dcae/common/publishing/PublisherTestMockServer.java
new file mode 100644
index 00000000..dbecd531
--- /dev/null
+++ b/src/test/java/org/onap/dcae/common/publishing/PublisherTestMockServer.java
@@ -0,0 +1,156 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+
+import com.google.gson.JsonElement;
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.junit.jupiter.MockServerExtension;
+import org.mockserver.junit.jupiter.MockServerSettings;
+import org.mockserver.matchers.Times;
+import org.mockserver.verify.VerificationTimes;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.createPublishRequest;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.getAsJsonElements;
+
+@ExtendWith(MockServerExtension.class)
+@MockServerSettings(ports = {1080, 8888})
+class PublisherTestMockServer {
+
+ private static final int MAX_IDLE_TIME = 10;
+ private static final int MAX_LIFE_TIME = 20;
+ private static final int CONNECTION_POOL = 1;
+ private static final String TOPIC = "TOPIC10";
+ private static final String PATH = String.format("/events/%s/", TOPIC);
+
+ private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
+ + "{"
+ + "\"requestError\":"
+ + "{"
+ + "\"serviceException\":"
+ + "{"
+ + "\"messageId\":\"SVC0001\","
+ + "\"text\":\"Client timeout exception occurred, Error code is %1\","
+ + "\"variables\":[\"408\"]"
+ + "}"
+ + "}"
+ + "}";
+
+ private final ClientAndServer client;
+
+ public PublisherTestMockServer(ClientAndServer client) {
+ this.client = client;
+ }
+
+ @Test
+ void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
+ //given
+ final Long timeoutSec = 1L;
+ final Publisher publisher = new Publisher(connectionPoolConfiguration());
+ final String simpleEvent = "{\"message\":\"message1\"}";
+ final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
+
+ final String path = String.format("/events/%s/", TOPIC);
+ client.when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ List<String> events = List.of(simpleEvent);
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = publisher.publishEvents(events, createPublishRequest(createPublishConfig(), timeoutSec));
+
+
+
+ StepVerifier.create(result)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(Duration.ofSeconds(10));
+
+ //then
+ client.verify(request().withPath(path), VerificationTimes.exactly(1));
+
+ }
+
+ @Test
+ void publishEvent_shouldSuccessfullyPublishSingleMessage() {
+ //given
+ final Publisher publisher = new Publisher();
+ final String simpleEvent = "{\"message\":\"message1\"}";
+ final List<String> twoJsonMessages = List.of(simpleEvent);
+ final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
+ client.when(request().withPath(PATH), Times.once())
+ .respond(response());
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = publisher.publishEvents(List.of(simpleEvent), createPublishConfig());
+
+ //then
+ StepVerifier.create(result)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(Duration.ofSeconds(10));
+ }
+
+ private Option<PublisherConfig> createPublishConfig() {
+ List<String> desc = List.of("localhost:1080");
+ PublisherConfig conf = new PublisherConfig(desc, TOPIC);
+ return Option.of(conf);
+ }
+
+ private MessageRouterPublishResponse successPublishResponse(List<JsonElement> items) {
+ return ImmutableMessageRouterPublishResponse
+ .builder()
+ .items(items)
+ .build();
+ }
+
+ public static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs) {
+ String failReason = formatArgs.length == 0 ? failReasonFormat : String.format(failReasonFormat, formatArgs);
+ return ImmutableMessageRouterPublishResponse
+ .builder()
+ .failReason(failReason)
+ .build();
+ }
+
+ public MessageRouterPublisherConfig connectionPoolConfiguration() {
+ return ImmutableMessageRouterPublisherConfig.builder()
+ .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
+ .connectionPool(CONNECTION_POOL)
+ .maxIdleTime(MAX_IDLE_TIME)
+ .maxLifeTime(MAX_LIFE_TIME)
+ .build())
+ .build();
+ }
+}
diff --git a/src/test/java/org/onap/dcae/common/validator/BatchEventValidatorTest.java b/src/test/java/org/onap/dcae/common/validator/BatchEventValidatorTest.java
new file mode 100644
index 00000000..05baa04b
--- /dev/null
+++ b/src/test/java/org/onap/dcae/common/validator/BatchEventValidatorTest.java
@@ -0,0 +1,110 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.validator;
+
+import org.json.JSONObject;
+import org.junit.jupiter.api.Test;
+import org.onap.dcae.ApplicationSettings;
+import org.onap.dcae.common.EventUpdater;
+import org.onap.dcae.common.model.VesEvent;
+import org.onap.dcae.restapi.EventValidatorException;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.onap.dcae.common.validator.BatchEventValidator.executeBatchEventValidation;
+
+class BatchEventValidatorTest {
+
+ private final ApplicationSettings settings = mock(ApplicationSettings.class);
+ private final EventUpdater eventUpdater = new EventUpdater(settings);
+ private static final String EVENT = "event";
+ private static final String EVENT_LIST = "eventList";
+
+ @Test
+ void shouldThrowException_whenDomainFieldsHaveDifferentValues() throws IOException {
+ //given
+ final List<VesEvent> eventList = prepareEventList("src/test/resources/ves7_batch_valid_two_different_domain.json", EVENT_LIST);
+
+ //when
+ //then
+ assertThrows(EventValidatorException.class, () -> executeBatchEventValidation(eventList));
+ }
+
+ @Test
+ void shouldNotThrowException_whenDomainFieldsHaveSameValues() throws IOException {
+ //given
+ final List<VesEvent> eventList = prepareEventList("src/test/resources/ves7_batch_valid.json", EVENT_LIST);
+
+ //when
+ //then
+ assertDoesNotThrow(() -> executeBatchEventValidation(eventList));
+ }
+
+ @Test
+ void shouldThrowException_whenStndDefinedNamespaceFieldsHaveDifferentValuesAndDomainsAreStndDefined() throws IOException {
+ //given
+ final List<VesEvent> eventList = prepareEventList("src/test/resources/ves7_batch_stdnDefined_withDifferentStndDefinedNamespace.json", EVENT_LIST);
+
+ //when
+ //then
+ assertThrows(EventValidatorException.class, () -> executeBatchEventValidation(eventList));
+ }
+
+ @Test
+ void shouldNotThrowException_whenStndDefinedNamespaceFieldsHaveSameValuesAndDomainsAreStndDefined() throws IOException {
+ //given
+ final List<VesEvent> eventList = prepareEventList("src/test/resources/ves7_batch_stdnDefined_withSameStndDefinedNamespace.json", EVENT_LIST);
+
+ //when
+ //then
+ assertDoesNotThrow(() -> executeBatchEventValidation(eventList));
+ }
+
+ @Test
+ void shouldNotThrowException_whenSendValidNotBatchEvent() throws IOException {
+ //given
+ final List<VesEvent> eventList = prepareEventList("src/test/resources/ves_stdnDefined_valid.json", EVENT);
+
+ //when
+ //then
+ assertDoesNotThrow(() -> executeBatchEventValidation(eventList));
+ }
+
+ private List<VesEvent> prepareEventList(String pathToFile, String eventType) throws IOException {
+ final VesEvent vesEventFromJson = createVesEventFromJson(pathToFile);
+ return eventUpdater.convert(vesEventFromJson, "v7", UUID.randomUUID(), eventType);
+ }
+
+ private VesEvent createVesEventFromJson(String pathToFile) throws IOException {
+ Path path = Paths.get(pathToFile);
+ final List<String> lines = Files.readAllLines(path);
+ String str = String.join("", lines);
+ return new VesEvent(new JSONObject(str));
+ }
+
+}