aboutsummaryrefslogtreecommitdiffstats
path: root/src/test
diff options
context:
space:
mode:
authorMaciej Malewski <maciej.malewski@nokia.com>2021-06-08 09:04:48 +0200
committerMaciej Malewski <maciej.malewski@nokia.com>2021-06-17 10:03:49 +0200
commit74b598291ed2461e0e482f556baf2943a97a54f2 (patch)
tree22e1140bc0f27680be4d47d40b0c94f7205b45b3 /src/test
parent26be283f4a7044aea4ee0ca480fde20eb5233ee2 (diff)
Replace cambria with DmaaP client
- remove cambria, add DmaaP client - sending event for many topics at once is no longer supported - add backward compatibility status codes - add additional validation for batchEvent Issue-ID: DCAEGEN2-1483 Signed-off-by: Maciej Malewski <maciej.malewski@nokia.com> Change-Id: I945c38b4ab04b697ecfabd5ce38502f83fa70d1a
Diffstat (limited to 'src/test')
-rw-r--r--src/test/java/org/onap/dcae/ApplicationSettingsTest.java45
-rw-r--r--src/test/java/org/onap/dcae/TLSTest.java2
-rw-r--r--src/test/java/org/onap/dcae/common/EventSenderTest.java35
-rw-r--r--src/test/java/org/onap/dcae/common/publishing/DMaaPConfigurationParserTest.java45
-rw-r--r--src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java126
-rw-r--r--src/test/java/org/onap/dcae/common/publishing/DMaaPPublishersCacheTest.java126
-rw-r--r--src/test/java/org/onap/dcae/common/publishing/DMaapContainer.java54
-rw-r--r--src/test/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapperTest.java116
-rw-r--r--src/test/java/org/onap/dcae/common/publishing/PublisherTest.java78
-rw-r--r--src/test/java/org/onap/dcae/common/publishing/PublisherTestMockServer.java156
-rw-r--r--src/test/java/org/onap/dcae/common/validator/BatchEventValidatorTest.java110
-rw-r--r--src/test/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducerTest.java69
-rw-r--r--src/test/java/org/onap/dcae/restapi/ApiAuthInterceptionTest.java2
-rw-r--r--src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java155
-rw-r--r--src/test/java/org/onap/dcae/vestest/TestVESLogger.java76
-rw-r--r--src/test/resources/dmaap-msg-router/MsgRtrApi.properties155
-rw-r--r--src/test/resources/dmaap-msg-router/cadi.properties18
-rw-r--r--src/test/resources/dmaap-msg-router/logback.xml207
-rw-r--r--src/test/resources/dmaap-msg-router/message-router-compose.yml82
-rw-r--r--src/test/resources/dmaap-msg-router/zk_client_jaas.conf5
-rw-r--r--src/test/resources/dmaap-msg-router/zk_server_jaas.conf4
-rw-r--r--src/test/resources/testParseDMaaPCredentialsLegacy.json26
-rw-r--r--src/test/resources/testParseDMaaPLegacy.json21
-rw-r--r--src/test/resources/ves7_batch_stdnDefined_withDifferentStndDefinedNamespace.json108
-rw-r--r--src/test/resources/ves7_batch_stdnDefined_withSameStndDefinedNamespace.json108
-rw-r--r--src/test/resources/ves7_batch_valid_two_different_domain.json90
26 files changed, 1505 insertions, 514 deletions
diff --git a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java
index 6ea94ab5..d5877619 100644
--- a/src/test/java/org/onap/dcae/ApplicationSettingsTest.java
+++ b/src/test/java/org/onap/dcae/ApplicationSettingsTest.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* org.onap.dcaegen2.collectors.ves
* ================================================================================
- * Copyright (C) 2018 - 2020 Nokia. All rights reserved.
+ * Copyright (C) 2018 - 2021 Nokia. All rights reserved.
* Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.networknt.schema.JsonSchema;
import io.vavr.collection.HashMap;
import io.vavr.collection.Map;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
@@ -37,11 +38,10 @@ import java.util.Arrays;
import java.util.Objects;
import static java.util.Collections.singletonList;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
import static org.onap.dcae.CLIUtils.processCmdLine;
import static org.onap.dcae.TestingUtilities.createTemporaryFile;
@@ -233,15 +233,6 @@ public class ApplicationSettingsTest {
}
@Test
- public void shouldReturnDefaultDMAAPConfigFileLocation() throws IOException {
- // when
- String dmaapConfigFileLocation = fromTemporaryConfiguration().dMaaPConfigurationFileLocation();
-
- // then
- assertEquals(sanitizePath("etc/DmaapConfig.json"), dmaapConfigFileLocation);
- }
-
- @Test
public void shouldTellIfSchemaValidationIsEnabled() throws IOException {
// when
boolean jsonSchemaValidationEnabled = fromTemporaryConfiguration("collector.schema.checkflag=1")
@@ -315,26 +306,26 @@ public class ApplicationSettingsTest {
@Test
public void shouldReturnDMAAPStreamId() throws IOException {
// given
- Map<String, String[]> expected = HashMap.of(
- "log", new String[]{"ves-syslog", "ves-auditlog"},
- "fault", new String[]{"ves-fault"}
+ Map<String, String> expected = HashMap.of(
+ "log", "ves-syslog",
+ "fault", "ves-fault"
);
// when
- Map<String, String[]> dmaapStreamID = fromTemporaryConfiguration(
- "collector.dmaap.streamid=fault=ves-fault|log=ves-syslog,ves-auditlog")
+ Map<String, String> dmaapStreamID = fromTemporaryConfiguration(
+ "collector.dmaap.streamid=fault=ves-fault,stream1|log=ves-syslog,stream2,stream3")
.getDmaapStreamIds();
// then
- assertArrayEquals(expected.get("log").get(), Objects.requireNonNull(dmaapStreamID).get("log").get());
- assertArrayEquals(expected.get("fault").get(), Objects.requireNonNull(dmaapStreamID).get("fault").get());
+ assertEquals(expected.get("log").get(), Objects.requireNonNull(dmaapStreamID).get("log").get());
+ assertEquals(expected.get("fault").get(), Objects.requireNonNull(dmaapStreamID).get("fault").get());
assertEquals(expected.keySet(), dmaapStreamID.keySet());
}
@Test
public void shouldReturnDefaultDMAAPStreamId() throws IOException {
// when
- Map<String, String[]> dmaapStreamID = fromTemporaryConfiguration().getDmaapStreamIds();
+ Map<String, String> dmaapStreamID = fromTemporaryConfiguration().getDmaapStreamIds();
// then
assertEquals(dmaapStreamID, HashMap.empty());
@@ -391,24 +382,24 @@ public class ApplicationSettingsTest {
}
@Test
- public void shouldReturnCambriaConfigurationFileLocation() throws IOException {
+ public void shouldReturnConfigurationFileLocation() throws IOException {
// when
- String cambriaConfigurationFileLocation = fromTemporaryConfiguration(
- "collector.dmaapfile=/somewhere/dmaapConfig")
+ String configurationFileLocation = fromTemporaryConfiguration(
+ "collector.dmaapfile=/somewhere/etc/ves-dmaap-config.json")
.dMaaPConfigurationFileLocation();
// then
- assertEquals(sanitizePath("/somewhere/dmaapConfig"), cambriaConfigurationFileLocation);
+ assertEquals(sanitizePath("/somewhere/etc/ves-dmaap-config.json"), configurationFileLocation);
}
@Test
- public void shouldReturnDefaultCambriaConfigurationFileLocation() throws IOException {
+ public void shouldReturnDefaultConfigurationFileLocation() throws IOException {
// when
- String cambriaConfigurationFileLocation = fromTemporaryConfiguration()
+ String configurationFileLocation = fromTemporaryConfiguration()
.dMaaPConfigurationFileLocation();
// then
- assertEquals(sanitizePath("etc/DmaapConfig.json"), cambriaConfigurationFileLocation);
+ assertEquals(sanitizePath("etc/ves-dmaap-config.json"), configurationFileLocation);
}
@Test
diff --git a/src/test/java/org/onap/dcae/TLSTest.java b/src/test/java/org/onap/dcae/TLSTest.java
index 424ddf8b..d33ae3ef 100644
--- a/src/test/java/org/onap/dcae/TLSTest.java
+++ b/src/test/java/org/onap/dcae/TLSTest.java
@@ -106,4 +106,4 @@ public class TLSTest extends TLSTestBase {
when(settings.getExternalSchemaStndDefinedDataPath()).thenReturn(STND_DEFINED_DATA_PATH);
}
}
-} \ No newline at end of file
+}
diff --git a/src/test/java/org/onap/dcae/common/EventSenderTest.java b/src/test/java/org/onap/dcae/common/EventSenderTest.java
index 454cfb52..6d508d0a 100644
--- a/src/test/java/org/onap/dcae/common/EventSenderTest.java
+++ b/src/test/java/org/onap/dcae/common/EventSenderTest.java
@@ -1,9 +1,9 @@
/*
* ============LICENSE_START=======================================================
- * PROJECT
+ * VES Collector
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018,2020 Nokia. All rights reserved.
+ * Copyright (C) 2018-2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.onap.dcae.common.model.StndDefinedNamespaceParameterNotDefinedException;
import org.onap.dcae.common.model.VesEvent;
import org.onap.dcae.common.publishing.DMaaPEventPublisher;
+import org.onap.dcae.restapi.EventValidatorException;
import java.io.IOException;
import java.util.List;
@@ -53,31 +54,18 @@ public class EventSenderTest {
List<VesEvent> eventToSend = createEventToSend("/eventsAfterTransformation/ves7_valid_event.json");
// when
- eventSender.send(eventToSend);
+ assertThatExceptionOfType(EventValidatorException.class)
+ .isThrownBy(() -> eventSender.send(eventToSend));
// then
verifyThatEventWasNotSendAtStream();
}
@Test
- public void shouldSendEventAtStreamsAssignedToEventDomain() throws IOException {
- // given
- EventSender eventSender = givenConfiguredEventSender(HashMap.of("fault", new String[]{"ves-fault", "fault-ves"}));
- List<VesEvent> eventToSend = createEventToSend("/eventsAfterTransformation/ves7_valid_event.json");
-
- // when
- eventSender.send(eventToSend);
-
- //then
- verifyThatEventWasSendAtStream("ves-fault");
- verifyThatEventWasSendAtStream("fault-ves");
- }
-
- @Test
public void shouldSendStdDefinedEventAtStreamAssignedToEventDomain() throws IOException {
// given
EventSender eventSender = givenConfiguredEventSender(
- HashMap.of("3GPP-FaultSupervision", new String[]{"ves-3gpp-fault-supervision"})
+ HashMap.of("3GPP-FaultSupervision", "ves-3gpp-fault-supervision")
);
List<VesEvent> eventToSend = createEventToSend("/eventsAfterTransformation/ves_stdnDefined_valid.json");
@@ -95,7 +83,8 @@ public class EventSenderTest {
List<VesEvent> eventToSend = createEventToSend("/eventsAfterTransformation/ves_stdnDefined_valid.json");
// when
- eventSender.send(eventToSend);
+ assertThatExceptionOfType(EventValidatorException.class)
+ .isThrownBy(() -> eventSender.send(eventToSend));
// then
verifyThatEventWasNotSendAtStream();
@@ -122,7 +111,7 @@ public class EventSenderTest {
return givenEventToSend(event);
}
- private EventSender givenConfiguredEventSender(io.vavr.collection.Map<String, String[]> streamIds) {
+ private EventSender givenConfiguredEventSender(io.vavr.collection.Map<String, String> streamIds) {
return new EventSender(eventPublisher, streamIds);
}
@@ -132,10 +121,10 @@ public class EventSenderTest {
}
private void verifyThatEventWasNotSendAtStream() {
- verify(eventPublisher,never()).sendEvent(any(),any());
+ verify(eventPublisher,never()).sendEvent(any(),any());
}
private void verifyThatEventWasSendAtStream(String s) {
- verify(eventPublisher).sendEvent(any(), eq(s));
- }
+ verify(eventPublisher).sendEvent(any(), eq(s));
+ }
}
diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPConfigurationParserTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPConfigurationParserTest.java
index 923aae02..9aaeb287 100644
--- a/src/test/java/org/onap/dcae/common/publishing/DMaaPConfigurationParserTest.java
+++ b/src/test/java/org/onap/dcae/common/publishing/DMaaPConfigurationParserTest.java
@@ -3,7 +3,7 @@
* org.onap.dcaegen2.collectors.ves
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018,2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -56,29 +56,6 @@ public class DMaaPConfigurationParserTest {
assertThat(authCredentialsKeysMissing.isSecured()).isFalse();
}
-
- @Test
- public void testParseCredentialsForLegacy() {
- Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsLegacy.json");
- Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path);
-
- PublisherConfig authCredentialsNull = publisherConfigs.get().get("auth-credentials-null").getOrNull();
- assertThat(authCredentialsNull.userName().isEmpty()).isTrue();
- assertThat(authCredentialsNull.password().isEmpty()).isTrue();
- assertThat(authCredentialsNull.isSecured()).isFalse();
-
- PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull();
- assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser");
- assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword");
- assertThat(authCredentialsPresent.isSecured()).isTrue();
-
- PublisherConfig authCredentialsMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull();
- assertThat(authCredentialsMissing.userName().isEmpty()).isTrue();
- assertThat(authCredentialsMissing.password().isEmpty()).isTrue();
- assertThat(authCredentialsMissing.isSecured()).isFalse();
- }
-
-
@Test
public void testParseGen2() {
Path path = Paths.get("src/test/resources/testParseDMaaPGen2.json");
@@ -93,22 +70,4 @@ public class DMaaPConfigurationParserTest {
assertThat(withOtherSegment.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
}
- @Test
- public void testParseLegacy() {
- Path exemplaryConfig = Paths.get("src/test/resources/testParseDMaaPLegacy.json");
- Try<Map<String, PublisherConfig>> publisherConfigs = DMaaPConfigurationParser
- .parseToDomainMapping(exemplaryConfig);
-
- PublisherConfig urlFirstThenHosts = publisherConfigs.get().get("url-precedes-hosts").getOrNull();
- assertThat(urlFirstThenHosts.destinations()).isEqualTo(List("127.0.0.1:3904"));
- assertThat(urlFirstThenHosts.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
-
- PublisherConfig urlKeyMissing = publisherConfigs.get().get("url-key-missing").getOrNull();
- assertThat(urlKeyMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com"));
- assertThat(urlKeyMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
-
- PublisherConfig urlIsMissing = publisherConfigs.get().get("url-is-null").getOrNull();
- assertThat(urlIsMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com"));
- assertThat(urlIsMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
- }
-} \ No newline at end of file
+}
diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java
deleted file mode 100644
index e4b6fd91..00000000
--- a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2018,2020 Nokia. All rights reserved.
- * Copyright (C) 2020 AT&T. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.common.publishing;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import org.json.JSONObject;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.dcae.common.model.VesEvent;
-
-import java.io.IOException;
-
-import static io.vavr.API.Option;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class DMaaPEventPublisherTest {
-
- private static final String STREAM_ID = "sampleStreamId";
-
- private static final JSONObject EXPECTED_EVENT =
- new JSONObject(
- "{\"VESversion\":\"v7\",\"event\":{"
- + "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,"
- + "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\","
- + "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\","
- + "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\","
- + "\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,"
- + "\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\","
- + "\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\","
- + "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}");
-
- private static final String PARTITION = "dns01cmd004";
-
- private DMaaPEventPublisher eventPublisher;
- private CambriaBatchingPublisher cambriaPublisher;
- private DMaaPPublishersCache DMaaPPublishersCache;
-
- @Before
- public void setUp() {
- cambriaPublisher = mock(CambriaBatchingPublisher.class);
- DMaaPPublishersCache = mock(DMaaPPublishersCache.class);
- when(DMaaPPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher));
- eventPublisher = new DMaaPEventPublisher(DMaaPPublishersCache);
- }
-
- @Test
- public void shouldSendEventToTopic() throws Exception {
- // when
- eventPublisher.sendEvent(givenVesEventWithoutVESuniqueIdField(), STREAM_ID);
-
- // then
- verify(cambriaPublisher).send(PARTITION, EXPECTED_EVENT.toString());
- }
-
- @Test
- public void shouldRemoveInternalVESUIDBeforeSending() throws Exception {
- // when
- eventPublisher.sendEvent(givenVesEventWithVESUniqueIdField(), STREAM_ID);
-
- // then
- verify(cambriaPublisher).send(PARTITION, EXPECTED_EVENT.toString());
- }
-
- @Test
- public void shouldCloseConnectionWhenExceptionOccurred() throws Exception {
- // given
- given(cambriaPublisher.send(anyString(), anyString()))
- .willThrow(new IOException("Expected exception - test case scenario!"));
-
- // when
- eventPublisher.sendEvent(givenVesEventWithVESUniqueIdField(), STREAM_ID);
-
- // then
- verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID);
- }
-
- private VesEvent givenVesEventWithVESUniqueIdField() {
- return new VesEvent(
- new JSONObject(
- "{\"VESversion\":\"v7\",\"VESuniqueId\":\"fd69d432-5cd5-4c15-9d34-407c81c61c6a-0\"," +
- "\"event\":{" +
- "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019," +
- "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\"," +
- "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\"," +
- "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\"," +
- "\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312," +
- "\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\"," +
- "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}"));
- }
-
- private VesEvent givenVesEventWithoutVESuniqueIdField() {
- return new VesEvent(
- new JSONObject(
- "{\"VESversion\":\"v7\"," +
- "\"event\":{" +
- "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019," +
- "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\"," +
- "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\"," +
- "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\"," +
- "\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312," +
- "\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\"," +
- "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}"));
- }
-}
diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPPublishersCacheTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPPublishersCacheTest.java
deleted file mode 100644
index f4dbe190..00000000
--- a/src/test/java/org/onap/dcae/common/publishing/DMaaPPublishersCacheTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.common.publishing;
-
-import static io.vavr.API.List;
-import static io.vavr.API.Map;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import io.vavr.collection.Map;
-import io.vavr.control.Option;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.dcae.common.publishing.DMaaPPublishersCache.CambriaPublishersCacheLoader;
-import org.onap.dcae.common.publishing.DMaaPPublishersCache.OnPublisherRemovalListener;
-
-
-public class DMaaPPublishersCacheTest {
-
- private String streamId1;
- private Map<String, PublisherConfig> dMaaPConfigs;
-
- @Before
- public void setUp() {
- streamId1 = "sampleStream1";
- dMaaPConfigs = Map("sampleStream1", new PublisherConfig(List("destination1"), "topic1"));
- }
-
- @Test
- public void shouldReturnTheSameCachedInstanceOnConsecutiveRetrievals() {
- // given
- DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs);
-
- // when
- Option<CambriaBatchingPublisher> firstPublisher = dMaaPPublishersCache.getPublisher(streamId1);
- Option<CambriaBatchingPublisher> secondPublisher = dMaaPPublishersCache.getPublisher(streamId1);
-
- // then
- assertSame("should return same instance", firstPublisher.get(), secondPublisher.get());
- }
-
- @Test
- public void shouldCloseCambriaPublisherOnCacheInvalidate() throws IOException, InterruptedException {
- // given
- CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class);
- CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class);
- DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock,
- new OnPublisherRemovalListener(),
- dMaaPConfigs);
- when(cacheLoaderMock.load(streamId1)).thenReturn(cambriaPublisherMock1);
-
- // when
- dMaaPPublishersCache.getPublisher(streamId1);
- dMaaPPublishersCache.closePublisherFor(streamId1);
-
- // then
- verify(cambriaPublisherMock1).close(20, TimeUnit.SECONDS);
-
- }
-
- @Test
- public void shouldReturnNoneIfThereIsNoDMaaPConfigurationForGivenStreamID() {
- // given
- DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs);
-
- // then
- assertTrue("should not exist", dMaaPPublishersCache.getPublisher("non-existing").isEmpty());
- }
-
-
- @Test
- public void shouldCloseOnlyChangedPublishers() throws IOException, InterruptedException {
- // given
- CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class);
- CambriaBatchingPublisher cambriaPublisherMock2 = mock(CambriaBatchingPublisher.class);
- CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class);
- String firstDomain = "domain1";
- String secondDomain = "domain2";
- Map<String, PublisherConfig> oldConfig = Map(firstDomain,
- new PublisherConfig(List("destination1"), "topic1"),
- secondDomain,
- new PublisherConfig(List("destination2"), "topic2",
- "user", "pass"));
- Map<String, PublisherConfig> newConfig = Map(firstDomain, new PublisherConfig(List("destination1"), "topic1"),
- secondDomain, new PublisherConfig(List("destination2"), "topic2"));
- DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock,
- new OnPublisherRemovalListener(),
- oldConfig);
- when(cacheLoaderMock.load(firstDomain)).thenReturn(cambriaPublisherMock1);
- when(cacheLoaderMock.load(secondDomain)).thenReturn(cambriaPublisherMock2);
-
- dMaaPPublishersCache.getPublisher(firstDomain);
- dMaaPPublishersCache.getPublisher(secondDomain);
-
- // when
- dMaaPPublishersCache.reconfigure(newConfig);
-
- // then
- verify(cambriaPublisherMock2).close(20, TimeUnit.SECONDS);
- verifyZeroInteractions(cambriaPublisherMock1);
- }
-} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaapContainer.java b/src/test/java/org/onap/dcae/common/publishing/DMaapContainer.java
new file mode 100644
index 00000000..9ece10b5
--- /dev/null
+++ b/src/test/java/org/onap/dcae/common/publishing/DMaapContainer.java
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START====================================
+ * VES Collector
+ * =========================================================
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcae.common.publishing;
+
+import org.testcontainers.containers.DockerComposeContainer;
+
+import java.io.File;
+import java.net.URL;
+
+final class DMaapContainer {
+ private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
+ private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(MR_COMPOSE_RESOURCE_NAME);
+ static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
+ static final String DMAAP_SERVICE_NAME = "onap-dmaap";
+
+ private DMaapContainer() {}
+
+
+ public static DockerComposeContainer createContainerInstance(){
+ return new DockerComposeContainer(
+ new File(DOCKER_COMPOSE_FILE_PATH))
+ .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT)
+ .withLocalCompose(true);
+ }
+
+
+
+ private static String getDockerComposeFilePath(String resourceName) {
+ URL resource = DMaapContainer.class.getClassLoader()
+ .getResource(resourceName);
+
+ if (resource != null) return resource.getFile();
+ else throw new RuntimeException(String
+ .format("File %s does not exist", resourceName));
+ }
+}
diff --git a/src/test/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapperTest.java b/src/test/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapperTest.java
new file mode 100644
index 00000000..0e5ae908
--- /dev/null
+++ b/src/test/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapperTest.java
@@ -0,0 +1,116 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.onap.dcae.common.model.BackwardsCompatibilityException;
+import org.onap.dcae.common.model.InternalException;
+import org.onap.dcae.common.model.PayloadToLargeException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.springframework.http.HttpStatus;
+
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.onap.dcae.ApplicationSettings.responseCompatibility;
+import static org.onap.dcae.common.publishing.MessageRouterHttpStatusMapper.getHttpStatus;
+
+class MessageRouterHttpStatusMapperTest {
+
+ public static final String BACKWARDS_COMPATIBILITY = "v7.2";
+ public static final String BACKWARDS_COMPATIBILITY_NONE = "NONE";
+
+ @Test
+ void ves_shouldResponse202() {
+ //given
+ responseCompatibility = BACKWARDS_COMPATIBILITY;
+ MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class);
+ when(messageRouterPublishResponse.successful()).thenReturn(true);
+
+ //when
+ HttpStatus httpStatusResponse = getHttpStatus(messageRouterPublishResponse);
+
+ //then
+ assertSame(HttpStatus.ACCEPTED, httpStatusResponse);
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = HttpStatus.class,
+ names = {"NOT_FOUND", "REQUEST_TIMEOUT", "TOO_MANY_REQUESTS", "INTERNAL_SERVER_ERROR", "BAD_GATEWAY",
+ "SERVICE_UNAVAILABLE", "GATEWAY_TIMEOUT","PAYLOAD_TOO_LARGE"}
+ )
+ void ves_shouldMapErrorsToBackwardsCompatibility(HttpStatus httpStatus) {
+ //given
+ responseCompatibility = BACKWARDS_COMPATIBILITY;
+ MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class);
+ when(messageRouterPublishResponse.failReason()).thenReturn(httpStatus.toString());
+
+ //when
+ //then
+ assertThrows(BackwardsCompatibilityException.class,()->getHttpStatus(messageRouterPublishResponse));
+ }
+
+ @Test
+ void ves_shouldResponse200WhenBackwardsCompatibilityIsNone() {
+ //given
+ responseCompatibility = BACKWARDS_COMPATIBILITY_NONE;
+ MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class);
+ when(messageRouterPublishResponse.successful()).thenReturn(true);
+
+ //when
+ HttpStatus httpStatusResponse = getHttpStatus(messageRouterPublishResponse);
+
+ //then
+ assertSame(HttpStatus.OK, httpStatusResponse);
+ }
+
+ @Test
+ void ves_shouldHandleError413WhenBackwardsCompatibilityIsNone() {
+ //given
+ responseCompatibility = BACKWARDS_COMPATIBILITY_NONE;
+ MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class);
+ when(messageRouterPublishResponse.failReason()).thenReturn(HttpStatus.PAYLOAD_TOO_LARGE.toString());
+
+ //when
+ //then
+ assertThrows(PayloadToLargeException.class,()->getHttpStatus(messageRouterPublishResponse));
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = HttpStatus.class,
+ names = {"NOT_FOUND", "REQUEST_TIMEOUT", "TOO_MANY_REQUESTS", "INTERNAL_SERVER_ERROR", "BAD_GATEWAY",
+ "SERVICE_UNAVAILABLE", "GATEWAY_TIMEOUT"}
+ )
+ void ves_shouldMapErrorsTo503WhenBackwardsCompatibilityIsNone(HttpStatus httpStatus) {
+ //given
+ responseCompatibility = BACKWARDS_COMPATIBILITY_NONE;
+ MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class);
+ when(messageRouterPublishResponse.failReason()).thenReturn(httpStatus.toString());
+
+ //when
+ //then
+ assertThrows(InternalException.class,()->getHttpStatus(messageRouterPublishResponse));
+ }
+}
diff --git a/src/test/java/org/onap/dcae/common/publishing/PublisherTest.java b/src/test/java/org/onap/dcae/common/publishing/PublisherTest.java
new file mode 100644
index 00000000..f269b942
--- /dev/null
+++ b/src/test/java/org/onap/dcae/common/publishing/PublisherTest.java
@@ -0,0 +1,78 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import com.google.gson.JsonElement;
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.time.Duration;
+
+import static org.onap.dcae.common.publishing.DMaapContainer.createContainerInstance;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.getAsJsonElements;
+
+
+@Testcontainers
+public class PublisherTest {
+
+ @Container
+ private final DockerComposeContainer CONTAINER = createContainerInstance();
+
+ @Test
+ void publishEvent_shouldSuccessfullyPublishSingleMessage() {
+ //given
+ final Publisher publisher = new Publisher();
+ final String simpleEvent = "{\"message\":\"message1\"}";
+ final List<String> twoJsonMessages = List.of(simpleEvent);
+ final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = publisher.publishEvents(twoJsonMessages, createPublishConfig());
+
+ //then
+ StepVerifier.create(result)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(Duration.ofSeconds(10));
+ }
+
+
+ private Option<PublisherConfig> createPublishConfig() {
+ List<String> desc = List.of("127.0.0.1:3904");
+ PublisherConfig conf = new PublisherConfig(desc, "topic");
+ return Option.of(conf);
+ }
+
+ private MessageRouterPublishResponse successPublishResponse(List<JsonElement> items) {
+ return ImmutableMessageRouterPublishResponse
+ .builder()
+ .items(items)
+ .build();
+ }
+
+}
diff --git a/src/test/java/org/onap/dcae/common/publishing/PublisherTestMockServer.java b/src/test/java/org/onap/dcae/common/publishing/PublisherTestMockServer.java
new file mode 100644
index 00000000..dbecd531
--- /dev/null
+++ b/src/test/java/org/onap/dcae/common/publishing/PublisherTestMockServer.java
@@ -0,0 +1,156 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+
+import com.google.gson.JsonElement;
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.junit.jupiter.MockServerExtension;
+import org.mockserver.junit.jupiter.MockServerSettings;
+import org.mockserver.matchers.Times;
+import org.mockserver.verify.VerificationTimes;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.createPublishRequest;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.getAsJsonElements;
+
+@ExtendWith(MockServerExtension.class)
+@MockServerSettings(ports = {1080, 8888})
+class PublisherTestMockServer {
+
+ private static final int MAX_IDLE_TIME = 10;
+ private static final int MAX_LIFE_TIME = 20;
+ private static final int CONNECTION_POOL = 1;
+ private static final String TOPIC = "TOPIC10";
+ private static final String PATH = String.format("/events/%s/", TOPIC);
+
+ private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
+ + "{"
+ + "\"requestError\":"
+ + "{"
+ + "\"serviceException\":"
+ + "{"
+ + "\"messageId\":\"SVC0001\","
+ + "\"text\":\"Client timeout exception occurred, Error code is %1\","
+ + "\"variables\":[\"408\"]"
+ + "}"
+ + "}"
+ + "}";
+
+ private final ClientAndServer client;
+
+ public PublisherTestMockServer(ClientAndServer client) {
+ this.client = client;
+ }
+
+ @Test
+ void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
+ //given
+ final Long timeoutSec = 1L;
+ final Publisher publisher = new Publisher(connectionPoolConfiguration());
+ final String simpleEvent = "{\"message\":\"message1\"}";
+ final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
+
+ final String path = String.format("/events/%s/", TOPIC);
+ client.when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ List<String> events = List.of(simpleEvent);
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = publisher.publishEvents(events, createPublishRequest(createPublishConfig(), timeoutSec));
+
+
+
+ StepVerifier.create(result)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(Duration.ofSeconds(10));
+
+ //then
+ client.verify(request().withPath(path), VerificationTimes.exactly(1));
+
+ }
+
+ @Test
+ void publishEvent_shouldSuccessfullyPublishSingleMessage() {
+ //given
+ final Publisher publisher = new Publisher();
+ final String simpleEvent = "{\"message\":\"message1\"}";
+ final List<String> twoJsonMessages = List.of(simpleEvent);
+ final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
+ client.when(request().withPath(PATH), Times.once())
+ .respond(response());
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = publisher.publishEvents(List.of(simpleEvent), createPublishConfig());
+
+ //then
+ StepVerifier.create(result)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(Duration.ofSeconds(10));
+ }
+
+ private Option<PublisherConfig> createPublishConfig() {
+ List<String> desc = List.of("localhost:1080");
+ PublisherConfig conf = new PublisherConfig(desc, TOPIC);
+ return Option.of(conf);
+ }
+
+ private MessageRouterPublishResponse successPublishResponse(List<JsonElement> items) {
+ return ImmutableMessageRouterPublishResponse
+ .builder()
+ .items(items)
+ .build();
+ }
+
+ public static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs) {
+ String failReason = formatArgs.length == 0 ? failReasonFormat : String.format(failReasonFormat, formatArgs);
+ return ImmutableMessageRouterPublishResponse
+ .builder()
+ .failReason(failReason)
+ .build();
+ }
+
+ public MessageRouterPublisherConfig connectionPoolConfiguration() {
+ return ImmutableMessageRouterPublisherConfig.builder()
+ .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
+ .connectionPool(CONNECTION_POOL)
+ .maxIdleTime(MAX_IDLE_TIME)
+ .maxLifeTime(MAX_LIFE_TIME)
+ .build())
+ .build();
+ }
+}
diff --git a/src/test/java/org/onap/dcae/common/validator/BatchEventValidatorTest.java b/src/test/java/org/onap/dcae/common/validator/BatchEventValidatorTest.java
new file mode 100644
index 00000000..05baa04b
--- /dev/null
+++ b/src/test/java/org/onap/dcae/common/validator/BatchEventValidatorTest.java
@@ -0,0 +1,110 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.validator;
+
+import org.json.JSONObject;
+import org.junit.jupiter.api.Test;
+import org.onap.dcae.ApplicationSettings;
+import org.onap.dcae.common.EventUpdater;
+import org.onap.dcae.common.model.VesEvent;
+import org.onap.dcae.restapi.EventValidatorException;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.onap.dcae.common.validator.BatchEventValidator.executeBatchEventValidation;
+
+class BatchEventValidatorTest {
+
+ private final ApplicationSettings settings = mock(ApplicationSettings.class);
+ private final EventUpdater eventUpdater = new EventUpdater(settings);
+ private static final String EVENT = "event";
+ private static final String EVENT_LIST = "eventList";
+
+ @Test
+ void shouldThrowException_whenDomainFieldsHaveDifferentValues() throws IOException {
+ //given
+ final List<VesEvent> eventList = prepareEventList("src/test/resources/ves7_batch_valid_two_different_domain.json", EVENT_LIST);
+
+ //when
+ //then
+ assertThrows(EventValidatorException.class, () -> executeBatchEventValidation(eventList));
+ }
+
+ @Test
+ void shouldNotThrowException_whenDomainFieldsHaveSameValues() throws IOException {
+ //given
+ final List<VesEvent> eventList = prepareEventList("src/test/resources/ves7_batch_valid.json", EVENT_LIST);
+
+ //when
+ //then
+ assertDoesNotThrow(() -> executeBatchEventValidation(eventList));
+ }
+
+ @Test
+ void shouldThrowException_whenStndDefinedNamespaceFieldsHaveDifferentValuesAndDomainsAreStndDefined() throws IOException {
+ //given
+ final List<VesEvent> eventList = prepareEventList("src/test/resources/ves7_batch_stdnDefined_withDifferentStndDefinedNamespace.json", EVENT_LIST);
+
+ //when
+ //then
+ assertThrows(EventValidatorException.class, () -> executeBatchEventValidation(eventList));
+ }
+
+ @Test
+ void shouldNotThrowException_whenStndDefinedNamespaceFieldsHaveSameValuesAndDomainsAreStndDefined() throws IOException {
+ //given
+ final List<VesEvent> eventList = prepareEventList("src/test/resources/ves7_batch_stdnDefined_withSameStndDefinedNamespace.json", EVENT_LIST);
+
+ //when
+ //then
+ assertDoesNotThrow(() -> executeBatchEventValidation(eventList));
+ }
+
+ @Test
+ void shouldNotThrowException_whenSendValidNotBatchEvent() throws IOException {
+ //given
+ final List<VesEvent> eventList = prepareEventList("src/test/resources/ves_stdnDefined_valid.json", EVENT);
+
+ //when
+ //then
+ assertDoesNotThrow(() -> executeBatchEventValidation(eventList));
+ }
+
+ private List<VesEvent> prepareEventList(String pathToFile, String eventType) throws IOException {
+ final VesEvent vesEventFromJson = createVesEventFromJson(pathToFile);
+ return eventUpdater.convert(vesEventFromJson, "v7", UUID.randomUUID(), eventType);
+ }
+
+ private VesEvent createVesEventFromJson(String pathToFile) throws IOException {
+ Path path = Paths.get(pathToFile);
+ final List<String> lines = Files.readAllLines(path);
+ String str = String.join("", lines);
+ return new VesEvent(new JSONObject(str));
+ }
+
+}
diff --git a/src/test/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducerTest.java b/src/test/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducerTest.java
new file mode 100644
index 00000000..d085eb13
--- /dev/null
+++ b/src/test/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducerTest.java
@@ -0,0 +1,69 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.multiplestreamreducer;
+
+import io.vavr.collection.HashMap;
+import io.vavr.collection.Map;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class MultipleStreamReducerTest {
+
+ private final MultipleStreamReducer multipleStreamReducer = new MultipleStreamReducer();
+ private final Map<String, String[]> domainToStreams = HashMap.of(
+ "fault", new String[]{"ves-fault", "stream1", "stream2"},
+ "log", new String[]{"ves-syslog", "stream3", "stream4", "stream5"},
+ "test", new String[]{"stream6"}
+ );
+
+ @Test
+ void shouldReduceStreamsToTheFirstOne() {
+ //given
+ Map<String, String> expected = HashMap.of(
+ "fault", "ves-fault",
+ "log", "ves-syslog",
+ "test", "stream6"
+ );
+
+ //when
+ final Map<String, String> domainToStreamsAfterReduce = multipleStreamReducer.reduce(domainToStreams);
+
+ //then
+ assertEquals(expected, domainToStreamsAfterReduce);
+ }
+
+ @Test
+ void shouldReturnInfoAboutDomainToStreamsConfig() {
+ //given
+ final Map<String, String> domainToStreamsAfterReduce = multipleStreamReducer.reduce(domainToStreams);
+ String expectedRedundantStreamsInfo =
+ "Domain: fault has active stream: ves-fault\n" +
+ "Domain: log has active stream: ves-syslog\n" +
+ "Domain: test has active stream: stream6\n";
+
+ //when
+ final String domainToStreamsConfigInfo = multipleStreamReducer.getDomainToStreamsInfo(domainToStreamsAfterReduce);
+
+ //then
+ assertEquals(expectedRedundantStreamsInfo, domainToStreamsConfigInfo);
+ }
+
+}
diff --git a/src/test/java/org/onap/dcae/restapi/ApiAuthInterceptionTest.java b/src/test/java/org/onap/dcae/restapi/ApiAuthInterceptionTest.java
index 9df0c694..931e7bc3 100644
--- a/src/test/java/org/onap/dcae/restapi/ApiAuthInterceptionTest.java
+++ b/src/test/java/org/onap/dcae/restapi/ApiAuthInterceptionTest.java
@@ -143,4 +143,4 @@ public class ApiAuthInterceptionTest {
healthcheckRequest.setServerPort(serverPort);
return healthcheckRequest;
}
-} \ No newline at end of file
+}
diff --git a/src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java b/src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java
index a3c0628d..9b436871 100644
--- a/src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java
+++ b/src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java
@@ -1,8 +1,8 @@
/*
* ============LICENSE_START=======================================================
- * PROJECT
+ * VES Collector
* ================================================================================
- * Copyright (C) 2020 Nokia. All rights reserved.s
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,24 +25,28 @@ import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.networknt.schema.JsonSchema;
import io.vavr.collection.HashMap;
-import org.apache.http.HttpStatus;
import org.jetbrains.annotations.NotNull;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.junit.jupiter.MockitoExtension;
import org.onap.dcae.ApplicationSettings;
import org.onap.dcae.JSonSchemasSupplier;
import org.onap.dcae.common.EventSender;
import org.onap.dcae.common.EventTransformation;
import org.onap.dcae.common.HeaderUtils;
import org.onap.dcae.common.JsonDataLoader;
-import org.onap.dcae.common.model.VesEvent;
-import org.onap.dcae.common.validator.StndDefinedDataValidator;
+import org.onap.dcae.common.model.InternalException;
+import org.onap.dcae.common.model.PayloadToLargeException;
import org.onap.dcae.common.publishing.DMaaPEventPublisher;
+import org.onap.dcae.common.validator.StndDefinedDataValidator;
import org.slf4j.Logger;
+import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.mock.web.MockHttpServletRequest;
import org.springframework.web.context.request.RequestContextHolder;
@@ -53,8 +57,10 @@ import java.io.IOException;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
@@ -63,14 +69,15 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-@RunWith(MockitoJUnitRunner.class)
+
+@ExtendWith(MockitoExtension.class)
public class VesRestControllerTest {
private static final String EVENT_TRANSFORM_FILE_PATH = "/eventTransform.json";
- private static final String ACCEPTED = "Accepted";
+ private static final String ACCEPTED = "Successfully send event";
private static final String VERSION_V7 = "v7";
- public static final String VES_FAULT_TOPIC = "ves-fault";
- public static final String VES_3_GPP_FAULT_SUPERVISION_TOPIC = "ves-3gpp-fault-supervision";
+ static final String VES_FAULT_TOPIC = "ves-fault";
+ static final String VES_3_GPP_FAULT_SUPERVISION_TOPIC = "ves-3gpp-fault-supervision";
private VesRestController vesRestController;
@@ -92,18 +99,18 @@ public class VesRestControllerTest {
@Mock
private StndDefinedDataValidator stndDefinedDataValidator;
- @Before
- public void setUp(){
- final HashMap<String, String[]> streamIds = HashMap.of(
- "fault", new String[]{VES_FAULT_TOPIC},
- "3GPP-FaultSupervision", new String[]{VES_3_GPP_FAULT_SUPERVISION_TOPIC}
+ @BeforeEach
+ void setUp(){
+ final HashMap<String, String> streamIds = HashMap.of(
+ "fault", VES_FAULT_TOPIC,
+ "3GPP-FaultSupervision", VES_3_GPP_FAULT_SUPERVISION_TOPIC
);
this.vesRestController = new VesRestController(applicationSettings, logger,
errorLogger, new EventSender(eventPublisher, streamIds), headerUtils, stndDefinedDataValidator);
}
@Test
- public void shouldReportThatApiVersionIsNotSupported() {
+ void shouldReportThatApiVersionIsNotSupported() {
// given
when(applicationSettings.isVersionSupported("v20")).thenReturn(false);
MockHttpServletRequest request = givenMockHttpServletRequest();
@@ -112,33 +119,33 @@ public class VesRestControllerTest {
final ResponseEntity<String> event = vesRestController.event("", "v20", request);
// then
- assertThat(event.getStatusCodeValue()).isEqualTo(HttpStatus.SC_BAD_REQUEST);
+ assertThat(event.getStatusCodeValue()).isEqualTo(HttpStatus.BAD_REQUEST.value());
assertThat(event.getBody()).isEqualTo("API version v20 is not supported");
verifyThatEventWasNotSend();
}
@Test
- public void shouldTransformEventAccordingToEventTransformFile() throws IOException {
+ void shouldTransformEventAccordingToEventTransformFile() throws IOException {
//given
configureEventTransformations();
configureHeadersForEventListener();
MockHttpServletRequest request = givenMockHttpServletRequest();
-
String validEvent = JsonDataLoader.loadContent("/ves7_valid_30_1_1_event.json");
+ when(eventPublisher.sendEvent(any(), any())).thenReturn((HttpStatus.OK));
//when
final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request);
//then
- assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED);
+ assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.OK.value());
assertThat(response.getBody()).isEqualTo(ACCEPTED);
verifyThatTransformedEventWasSend(eventPublisher, validEvent);
}
@Test
- public void shouldSendBatchOfEvents() throws IOException {
+ void shouldSendBatchEvent() throws IOException {
//given
configureEventTransformations();
configureHeadersForEventListener();
@@ -146,18 +153,18 @@ public class VesRestControllerTest {
MockHttpServletRequest request = givenMockHttpServletRequest();
String validEvent = JsonDataLoader.loadContent("/ves7_batch_valid.json");
-
+ when(eventPublisher.sendEvent(any(), any())).thenReturn(HttpStatus.OK);
//when
final ResponseEntity<String> response = vesRestController.events(validEvent, VERSION_V7, request);
//then
- assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED);
+ assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.OK.value());
assertThat(response.getBody()).isEqualTo(ACCEPTED);
- verify(eventPublisher, times(2)).sendEvent(any(),any());
+ verify(eventPublisher, times(1)).sendEvent(any(),any());
}
@Test
- public void shouldSendStndDomainEventIntoDomainStream() throws IOException {
+ void shouldSendStndDomainEventIntoDomainStream() throws IOException {
//given
configureEventTransformations();
configureHeadersForEventListener();
@@ -166,19 +173,20 @@ public class VesRestControllerTest {
configureSchemasSupplierForStndDefineEvent();
String validEvent = JsonDataLoader.loadContent("/ves_stdnDefined_valid.json");
+ when(eventPublisher.sendEvent(any(), any())).thenReturn(HttpStatus.OK);
//when
final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request);
//then
- assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED);
+ assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.OK.value());
assertThat(response.getBody()).isEqualTo(ACCEPTED);
verify(eventPublisher).sendEvent(any(),eq(VES_3_GPP_FAULT_SUPERVISION_TOPIC));
}
@Test
- public void shouldReportThatStndDomainEventHasntGotNamespaceParameter() throws IOException {
+ void shouldReportThatStndDomainEventHasntGotNamespaceParameter() throws IOException {
//given
configureEventTransformations();
configureHeadersForEventListener();
@@ -192,7 +200,7 @@ public class VesRestControllerTest {
final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request);
//then
- assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_BAD_REQUEST);
+ assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.BAD_REQUEST.value());
verifyErrorResponse(
response,
"SVC2006",
@@ -203,7 +211,7 @@ public class VesRestControllerTest {
}
@Test
- public void shouldReportThatStndDomainEventNamespaceParameterIsEmpty() throws IOException {
+ void shouldReportThatStndDomainEventNamespaceParameterIsEmpty() throws IOException {
//given
configureEventTransformations();
configureHeadersForEventListener();
@@ -217,7 +225,7 @@ public class VesRestControllerTest {
final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request);
//then
- assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_BAD_REQUEST);
+ assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.BAD_REQUEST.value());
verifyErrorResponse(
response,
"SVC2006",
@@ -228,7 +236,7 @@ public class VesRestControllerTest {
}
@Test
- public void shouldNotSendStndDomainEventWhenTopicCannotBeFoundInConfiguration() throws IOException {
+ void shouldNotSendStndDomainEventWhenTopicCannotBeFoundInConfiguration() throws IOException {
//given
configureEventTransformations();
configureHeadersForEventListener();
@@ -240,13 +248,12 @@ public class VesRestControllerTest {
final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request);
//then
- assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED);
- assertThat(response.getBody()).isEqualTo(ACCEPTED);
+ assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.BAD_REQUEST.value());
verifyThatEventWasNotSend();
}
@Test
- public void shouldExecuteStndDefinedValidationWhenFlagIsOnTrue() throws IOException {
+ void shouldExecuteStndDefinedValidationWhenFlagIsOnTrue() throws IOException {
//given
configureEventTransformations();
configureHeadersForEventListener();
@@ -254,18 +261,18 @@ public class VesRestControllerTest {
MockHttpServletRequest request = givenMockHttpServletRequest();
String validEvent = JsonDataLoader.loadContent("/ves7_batch_with_stndDefined_valid.json");
when(applicationSettings.getExternalSchemaValidationCheckflag()).thenReturn(true);
-
+ when(eventPublisher.sendEvent(any(), any())).thenReturn(HttpStatus.OK);
//when
final ResponseEntity<String> response = vesRestController.events(validEvent, VERSION_V7, request);
//then
- assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED);
+ assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.OK.value());
assertThat(response.getBody()).isEqualTo(ACCEPTED);
verify(stndDefinedDataValidator, times(2)).validate(any());
}
@Test
- public void shouldNotExecuteStndDefinedValidationWhenFlagIsOnFalse() throws IOException {
+ void shouldNotExecuteStndDefinedValidationWhenFlagIsOnFalse() throws IOException {
//given
configureEventTransformations();
configureHeadersForEventListener();
@@ -273,16 +280,76 @@ public class VesRestControllerTest {
MockHttpServletRequest request = givenMockHttpServletRequest();
String validEvent = JsonDataLoader.loadContent("/ves7_batch_with_stndDefined_valid.json");
when(applicationSettings.getExternalSchemaValidationCheckflag()).thenReturn(false);
+ when(eventPublisher.sendEvent(any(), any())).thenReturn(HttpStatus.OK);
//when
final ResponseEntity<String> response = vesRestController.events(validEvent, VERSION_V7, request);
//then
- assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED);
+ assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.OK.value());
assertThat(response.getBody()).isEqualTo(ACCEPTED);
verify(stndDefinedDataValidator, times(0)).validate(any());
}
+ @Test
+ void shouldReturn413WhenPayloadIsTooLarge() throws IOException {
+ //given
+ configureEventTransformations();
+ configureHeadersForEventListener();
+
+ MockHttpServletRequest request = givenMockHttpServletRequest();
+ when(eventPublisher.sendEvent(any(), any())).thenThrow(new PayloadToLargeException());
+ String validEvent = JsonDataLoader.loadContent("/ves7_valid_30_1_1_event.json");
+
+ //when
+ final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request);
+
+ //then
+ assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.PAYLOAD_TOO_LARGE.value());
+ verifyErrorResponse(
+ response,
+ "SVC2000",
+ "The following service error occurred: %1. Error code is %2",
+ List.of("Request Entity Too Large","413")
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("errorsCodeAndResponseBody")
+ void shouldMapErrorTo503AndReturnOriginalBody(ApiException apiException,String bodyVariable,String bodyVariable2) throws IOException {
+ //given
+ configureEventTransformations();
+ configureHeadersForEventListener();
+
+ MockHttpServletRequest request = givenMockHttpServletRequest();
+ when(eventPublisher.sendEvent(any(), any())).thenThrow(new InternalException(apiException));
+ String validEvent = JsonDataLoader.loadContent("/ves7_valid_30_1_1_event.json");
+
+ //when
+ final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request);
+
+ //then
+ assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE.value());
+ verifyErrorResponse(
+ response,
+ "SVC2000",
+ "The following service error occurred: %1. Error code is %2",
+ List.of(bodyVariable,bodyVariable2)
+ );
+ }
+
+ private static Stream<Arguments> errorsCodeAndResponseBody() {
+ return Stream.of(
+ arguments(ApiException.NOT_FOUND, "Not Found","404"),
+ arguments(ApiException.REQUEST_TIMEOUT, "Request Timeout","408"),
+ arguments(ApiException.TOO_MANY_REQUESTS, "Too Many Requests","429"),
+ arguments(ApiException.INTERNAL_SERVER_ERROR, "Internal Server Error","500"),
+ arguments(ApiException.BAD_GATEWAY, "Bad Gateway","502"),
+ arguments(ApiException.SERVICE_UNAVAILABLE, "Service Unavailable","503"),
+ arguments(ApiException.GATEWAY_TIMEOUT, "Gateway Timeout","504")
+ );
+ }
+
private void verifyThatEventWasNotSend() {
verify(eventPublisher, never()).sendEvent(any(), any());
}
@@ -313,7 +380,7 @@ public class VesRestControllerTest {
final List<EventTransformation> eventTransformations = loadEventTransformations();
when(applicationSettings.isVersionSupported(VERSION_V7)).thenReturn(true);
when(applicationSettings.eventTransformingEnabled()).thenReturn(true);
- when(applicationSettings.getEventTransformations()).thenReturn(eventTransformations);
+ when(applicationSettings.getEventTransformations()).thenReturn((eventTransformations));
}
private void configureHeadersForEventListener() {
@@ -326,11 +393,11 @@ public class VesRestControllerTest {
assertThat(eventBeforeTransformation).contains("\"version\": \"4.0.1\"");
assertThat(eventBeforeTransformation).contains("\"faultFieldsVersion\": \"4.0\"");
- ArgumentCaptor<VesEvent> argument = ArgumentCaptor.forClass(VesEvent.class);
+ ArgumentCaptor<List> argument = ArgumentCaptor.forClass(List.class);
ArgumentCaptor<String> domain = ArgumentCaptor.forClass(String.class);
verify(eventPublisher).sendEvent(argument.capture(), domain.capture());
- final String transformedEvent = argument.getValue().asJsonObject().toString();
+ final String transformedEvent = argument.getValue().toString();
final String eventSentAtTopic = domain.getValue();
// event after transformation
diff --git a/src/test/java/org/onap/dcae/vestest/TestVESLogger.java b/src/test/java/org/onap/dcae/vestest/TestVESLogger.java
deleted file mode 100644
index 1689263e..00000000
--- a/src/test/java/org/onap/dcae/vestest/TestVESLogger.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.vestest;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.onap.dcae.common.VESLogger.REQUEST_ID;
-
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
-import java.util.UUID;
-import org.junit.Test;
-import org.onap.dcae.common.VESLogger;
-
-public class TestVESLogger {
-
- @Test
- public void shouldOnLoggingContextInitializationPutRandomUuidAsRequestId() {
- LoggingContext commonLoggingContext = VESLogger.getCommonLoggingContext();
- String requestId = commonLoggingContext.get(REQUEST_ID, "default");
-
- assertNotNull(requestId);
- assertNotSame(requestId, "default");
-
- }
-
- @Test
- public void shouldOnLoggingContextInitializationPutGivenUuuidAsRequestIdAndSupplyEndTimestamp() {
- final UUID uuid = UUID.randomUUID();
- LoggingContext loggingContextForThread = VESLogger.getLoggingContextForThread(uuid);
- String requestId = loggingContextForThread.get(REQUEST_ID, "default");
- String endTimestamp = loggingContextForThread.get(EcompFields.kEndTimestamp, "default");
-
- assertNotNull(requestId);
- assertNotNull(endTimestamp);
- assertNotSame(endTimestamp, "default");
- assertEquals(requestId, uuid.toString());
- }
-
- @Test
- public void shouldOnLoggingContextInitializationPutGivenUuidAsRequestIdAndSupplyEndTimestampAndCompleteStatusCode() {
- final UUID uuid = UUID.randomUUID();
- LoggingContext loggingContextForThread = VESLogger.getLoggingContextForThread(uuid.toString());
- String requestId = loggingContextForThread.get(REQUEST_ID, "default");
- String statusCode = loggingContextForThread.get("statusCode", "default");
- String endTimestamp = loggingContextForThread.get(EcompFields.kEndTimestamp, "default");
-
- assertNotNull(requestId);
- assertNotNull(endTimestamp);
- assertNotNull(statusCode);
- assertNotSame(endTimestamp, "default");
- assertEquals(requestId, uuid.toString());
- assertEquals(statusCode, "COMPLETE");
- }
-
-}
-
diff --git a/src/test/resources/dmaap-msg-router/MsgRtrApi.properties b/src/test/resources/dmaap-msg-router/MsgRtrApi.properties
new file mode 100644
index 00000000..d288bd23
--- /dev/null
+++ b/src/test/resources/dmaap-msg-router/MsgRtrApi.properties
@@ -0,0 +1,155 @@
+# LICENSE_START=======================================================
+# org.onap.dmaap
+# ================================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#
+###############################################################################
+###############################################################################
+##
+## Cambria API Server config
+##
+## Default values are shown as commented settings.
+##
+###############################################################################
+##
+## HTTP service
+##
+## 3904 is standard as of 7/29/14.
+#
+## Zookeeper Connection
+##
+## Both Cambria and Kafka make use of Zookeeper.
+##
+#config.zk.servers=172.18.1.1
+#config.zk.servers={{.Values.zookeeper.name}}:{{.Values.zookeeper.port}}
+config.zk.servers=zookeeper
+#config.zk.root=/fe3c/cambria/config
+###############################################################################
+##
+## Kafka Connection
+##
+## Items below are passed through to Kafka's producer and consumer
+## configurations (after removing "kafka.")
+## if you want to change request.required.acks it can take this one value
+#kafka.metadata.broker.list=localhost:9092,localhost:9093
+#kafka.metadata.broker.list={{.Values.kafka.name}}:{{.Values.kafka.port}}
+kafka.metadata.broker.list=kafka:9092
+##kafka.request.required.acks=-1
+#kafka.client.zookeeper=${config.zk.servers}
+consumer.timeout.ms=100
+zookeeper.connection.timeout.ms=6000
+zookeeper.session.timeout.ms=20000
+zookeeper.sync.time.ms=2000
+auto.commit.interval.ms=1000
+fetch.message.max.bytes=1000000
+auto.commit.enable=false
+#(backoff*retries > zksessiontimeout)
+kafka.rebalance.backoff.ms=10000
+kafka.rebalance.max.retries=6
+###############################################################################
+##
+## Secured Config
+##
+## Some data stored in the config system is sensitive -- API keys and secrets,
+## for example. to protect it, we use an encryption layer for this section
+## of the config.
+##
+## The key is a base64 encode AES key. This must be created/configured for
+## each installation.
+#cambria.secureConfig.key=
+##
+## The initialization vector is a 16 byte value specific to the secured store.
+## This must be created/configured for each installation.
+#cambria.secureConfig.iv=
+## Southfield Sandbox
+cambria.secureConfig.key=b/7ouTn9FfEw2PQwL0ov/Q==
+cambria.secureConfig.iv=wR9xP5k5vbz/xD0LmtqQLw==
+authentication.adminSecret=fe3cCompound
+#cambria.secureConfig.key[pc569h]=YT3XPyxEmKCTLI2NK+Sjbw==
+#cambria.secureConfig.iv[pc569h]=rMm2jhR3yVnU+u2V9Ugu3Q==
+###############################################################################
+##
+## Consumer Caching
+##
+## Kafka expects live connections from the consumer to the broker, which
+## obviously doesn't work over connectionless HTTP requests. The Cambria
+## server proxies HTTP requests into Kafka consumer sessions that are kept
+## around for later re-use. Not doing so is costly for setup per request,
+## which would substantially impact a high volume consumer's performance.
+##
+## This complicates Cambria server failover, because we often need server
+## A to close its connection before server B brings up the replacement.
+##
+## The consumer cache is normally enabled.
+#cambria.consumer.cache.enabled=true
+## Cached consumers are cleaned up after a period of disuse. The server inspects
+## consumers every sweepFreqSeconds and will clean up any connections that are
+## dormant for touchFreqMs.
+#cambria.consumer.cache.sweepFreqSeconds=15
+cambria.consumer.cache.touchFreqMs=120000
+##stickforallconsumerrequests=false
+## The cache is managed through ZK. The default value for the ZK connection
+## string is the same as config.zk.servers.
+#cambria.consumer.cache.zkConnect=${config.zk.servers}
+
+##
+## Shared cache information is associated with this node's name. The default
+## name is the hostname plus the HTTP service port this host runs on. (The
+## hostname is determined via InetAddress.getLocalHost ().getCanonicalHostName(),
+## which is not always adequate.) You can set this value explicitly here.
+##
+#cambria.api.node.identifier=<use-something-unique-to-this-instance>
+
+#cambria.rateLimit.maxEmptyPollsPerMinute=30
+#cambria.rateLimitActual.delay.ms=10
+###############################################################################
+##
+## Metrics Reporting
+##
+## This server can report its metrics periodically on a topic.
+##
+#metrics.send.cambria.enabled=true
+#metrics.send.cambria.topic=cambria.apinode.metrics #msgrtr.apinode.metrics.dmaap
+#metrics.send.cambria.sendEverySeconds=60
+cambria.consumer.cache.zkBasePath=/fe3c/cambria/consumerCache
+consumer.timeout=17
+default.partitions=3
+default.replicas=3
+##############################################################################
+#100mb
+maxcontentlength=10000
+##############################################################################
+#AAF Properties
+msgRtr.namespace.aaf=org.onap.dmaap.mr.topic
+msgRtr.topicfactory.aaf=org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:
+enforced.topic.name.AAF=org.onap.dmaap.mr
+forceAAF=false
+transidUEBtopicreqd=false
+defaultNSforUEB=org.onap.dmaap.mr
+##############################################################################
+#Mirror Maker Agent
+msgRtr.mirrormakeradmin.aaf=org.onap.dmaap.mr.mirrormaker|*|admin
+msgRtr.mirrormakeruser.aaf=org.onap.dmaap.mr.mirrormaker|*|user
+msgRtr.mirrormakeruser.aaf.create=org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:
+msgRtr.mirrormaker.timeout=15000
+msgRtr.mirrormaker.topic=org.onap.dmaap.mr.mirrormakeragent
+msgRtr.mirrormaker.consumergroup=mmagentserver
+msgRtr.mirrormaker.consumerid=1
+kafka.max.poll.interval.ms=300000
+kafka.heartbeat.interval.ms=60000
+kafka.session.timeout.ms=240000
+kafka.max.poll.records=1000
diff --git a/src/test/resources/dmaap-msg-router/cadi.properties b/src/test/resources/dmaap-msg-router/cadi.properties
new file mode 100644
index 00000000..f2a3cdc9
--- /dev/null
+++ b/src/test/resources/dmaap-msg-router/cadi.properties
@@ -0,0 +1,18 @@
+aaf_url=https://AAF_LOCATE_URL/onap.org.osaaf.aaf.service:2.1
+aaf_env=DEV
+aaf_lur=org.onap.aaf.cadi.aaf.v2_0.AAFLurPerm
+
+cadi_truststore=/appl/dmaapMR1/etc/org.onap.dmaap.mr.trust.jks
+cadi_truststore_password=8FyfX+ar;0$uZQ0h9*oXchNX
+
+cadi_keyfile=/appl/dmaapMR1/etc/org.onap.dmaap.mr.keyfile
+
+cadi_alias=dmaapmr@mr.dmaap.onap.org
+cadi_keystore=/appl/dmaapMR1/etc/org.onap.dmaap.mr.p12
+cadi_keystore_password=GDQttV7)BlOvWMf6F7tz&cjy
+cadi_x509_issuers=CN=intermediateCA_1, OU=OSAAF, O=ONAP, C=US:CN=intermediateCA_7, OU=OSAAF, O=ONAP, C=US:CN=intermediateCA_9, OU=OSAAF, O=ONAP, C=US
+
+cadi_loglevel=INFO
+cadi_protocols=TLSv1.1,TLSv1.2
+cadi_latitude=37.78187
+cadi_longitude=-122.26147
diff --git a/src/test/resources/dmaap-msg-router/logback.xml b/src/test/resources/dmaap-msg-router/logback.xml
new file mode 100644
index 00000000..a39d9e47
--- /dev/null
+++ b/src/test/resources/dmaap-msg-router/logback.xml
@@ -0,0 +1,207 @@
+<!--
+ ============LICENSE_START=======================================================
+ Copyright © 2019 AT&T Intellectual Property. All rights reserved.
+ ================================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ============LICENSE_END=========================================================
+ -->
+
+<configuration scan="true" scanPeriod="3 seconds" debug="false">
+ <contextName>${module.ajsc.namespace.name}</contextName>
+ <jmxConfigurator/>
+ <property name="logDirectory" value="${AJSC_HOME}/log"/>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <filter class="ch.qos.logback.classic.filter.LevelFilter">
+ <level>ERROR</level>
+ <onMatch>ACCEPT</onMatch>
+ <onMismatch>DENY</onMismatch>
+ </filter>
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{1024} - %msg%n
+ </pattern>
+ </encoder>
+ </appender>
+
+ <appender name="INFO" class="ch.qos.logback.core.ConsoleAppender">
+ <filter class="ch.qos.logback.classic.filter.LevelFilter">
+ <level>INFO</level>
+ <onMatch>ACCEPT</onMatch>
+ <onMismatch>DENY</onMismatch>
+ </filter>
+ </appender>
+
+ <appender name="DEBUG" class="ch.qos.logback.core.ConsoleAppender">
+
+ <encoder>
+ <pattern>"%d [%thread] %-5level %logger{1024} - %msg%n"</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ERROR" class="ch.qos.logback.core.ConsoleAppender">class="ch.qos.logback.core.ConsoleAppender">
+ <filter class="ch.qos.logback.classic.filter.LevelFilter">
+ <level>ERROR</level>
+ <onMatch>ACCEPT</onMatch>
+ <onMismatch>DENY</onMismatch>
+ </filter>
+ <encoder>
+ <pattern>"%d [%thread] %-5level %logger{1024} - %msg%n"</pattern>
+ </encoder>
+ </appender>
+
+
+ <!-- Msgrtr related loggers -->
+ <logger name="org.onap.dmaap.dmf.mr.service" level="INFO"/>
+ <logger name="org.onap.dmaap.dmf.mr.service.impl" level="INFO"/>
+
+ <logger name="org.onap.dmaap.dmf.mr.resources" level="INFO"/>
+ <logger name="org.onap.dmaap.dmf.mr.resources.streamReaders" level="INFO"/>
+
+ <logger name="org.onap.dmaap.dmf.mr.backends" level="INFO"/>
+ <logger name="org.onap.dmaap.dmf.mr.backends.kafka" level="INFO"/>
+ <logger name="org.onap.dmaap.dmf.mr.backends.memory" level="INFO"/>
+
+ <logger name="org.onap.dmaap.dmf.mr.beans" level="INFO"/>
+
+ <logger name="org.onap.dmaap.dmf.mr.constants" level="INFO"/>
+
+ <logger name="org.onap.dmaap.dmf.mr.exception" level="INFO"/>
+
+ <logger name="org.onap.dmaap.dmf.mr.listener" level="INFO"/>
+
+ <logger name="org.onap.dmaap.dmf.mr.metabroker" level="INFO"/>
+
+ <logger name="org.onap.dmaap.dmf.mr.metrics.publisher" level="INFO"/>
+ <logger name="org.onap.dmaap.dmf.mr.metrics.publisher.impl" level="INFO"/>
+
+
+ <logger name="org.onap.dmaap.dmf.mr.security" level="INFO"/>
+ <logger name="org.onap.dmaap.dmf.mr.security.impl" level="INFO"/>
+
+ <logger name="org.onap.dmaap.dmf.mr.transaction" level="INFO"/>
+ <logger name="com.att.dmf.mr.transaction.impl" level="INFO"/>
+
+ <logger name="org.onap.dmaap.dmf.mr.metabroker" level="INFO"/>
+ <logger name="org.onap.dmaap.dmf.mr.metabroker" level="INFO"/>
+
+ <logger name="org.onap.dmaap.dmf.mr.utils" level="INFO"/>
+ <logger name="org.onap.dmaap.mr.filter" level="INFO"/>
+
+ <!--<logger name="com.att.nsa.cambria.*" level="INFO" />-->
+
+ <!-- Msgrtr loggers in ajsc -->
+ <logger name="org.onap.dmaap.service" level="INFO"/>
+ <logger name="org.onap.dmaap" level="INFO"/>
+
+
+ <!-- Spring related loggers -->
+ <logger name="org.springframework" level="WARN" additivity="false"/>
+ <logger name="org.springframework.beans" level="WARN" additivity="false"/>
+ <logger name="org.springframework.web" level="WARN" additivity="false"/>
+ <logger name="com.blog.spring.jms" level="WARN" additivity="false"/>
+
+ <!-- AJSC Services (bootstrap services) -->
+ <logger name="ajsc" level="WARN" additivity="false"/>
+ <logger name="ajsc.RouteMgmtService" level="INFO" additivity="false"/>
+ <logger name="ajsc.ComputeService" level="INFO" additivity="false"/>
+ <logger name="ajsc.VandelayService" level="WARN" additivity="false"/>
+ <logger name="ajsc.FilePersistenceService" level="WARN" additivity="false"/>
+ <logger name="ajsc.UserDefinedJarService" level="WARN" additivity="false"/>
+ <logger name="ajsc.UserDefinedBeansDefService" level="WARN" additivity="false"/>
+ <logger name="ajsc.LoggingConfigurationService" level="WARN" additivity="false"/>
+
+ <!-- AJSC related loggers (DME2 Registration, csi logging, restlet, servlet
+ logging) -->
+ <logger name="ajsc.utils" level="WARN" additivity="false"/>
+ <logger name="ajsc.utils.DME2Helper" level="INFO" additivity="false"/>
+ <logger name="ajsc.filters" level="DEBUG" additivity="false"/>
+ <logger name="ajsc.beans.interceptors" level="DEBUG" additivity="false"/>
+ <logger name="ajsc.restlet" level="DEBUG" additivity="false"/>
+ <logger name="ajsc.servlet" level="DEBUG" additivity="false"/>
+ <logger name="com.att" level="WARN" additivity="false"/>
+ <logger name="com.att.ajsc.csi.logging" level="WARN" additivity="false"/>
+ <logger name="com.att.ajsc.filemonitor" level="WARN" additivity="false"/>
+
+ <logger name="com.att.nsa.dmaap.util" level="INFO" additivity="false"/>
+ <logger name="com.att.cadi.filter" level="INFO" additivity="false"/>
+
+
+ <!-- Other Loggers that may help troubleshoot -->
+ <logger name="net.sf" level="WARN" additivity="false"/>
+ <logger name="org.apache.commons.httpclient" level="WARN" additivity="false"/>
+ <logger name="org.apache.commons" level="WARN" additivity="false"/>
+ <logger name="org.apache.coyote" level="WARN" additivity="false"/>
+ <logger name="org.apache.jasper" level="WARN" additivity="false"/>
+
+ <!-- Camel Related Loggers (including restlet/servlet/jaxrs/cxf logging.
+ May aid in troubleshooting) -->
+ <logger name="org.apache.camel" level="WARN" additivity="false"/>
+ <logger name="org.apache.cxf" level="WARN" additivity="false"/>
+ <logger name="org.apache.camel.processor.interceptor" level="WARN" additivity="false"/>
+ <logger name="org.apache.cxf.jaxrs.interceptor" level="WARN" additivity="false"/>
+ <logger name="org.apache.cxf.service" level="WARN" additivity="false"/>
+ <logger name="org.restlet" level="DEBUG" additivity="false"/>
+ <logger name="org.apache.camel.component.restlet" level="DEBUG" additivity="false"/>
+ <logger name="org.apache.kafka" level="DEBUG" additivity="false"/>
+ <logger name="org.apache.zookeeper" level="INFO" additivity="false"/>
+ <logger name="org.I0Itec.zkclient" level="DEBUG" additivity="false"/>
+
+ <!-- logback internals logging -->
+ <logger name="ch.qos.logback.classic" level="INFO" additivity="false"/>
+ <logger name="ch.qos.logback.core" level="INFO" additivity="false"/>
+
+ <!-- logback jms appenders & loggers definition starts here -->
+ <!-- logback jms appenders & loggers definition starts here -->
+ <appender name="auditLogs" class="ch.qos.logback.core.ConsoleAppender">
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ </filter>
+ <encoder>
+ <pattern>"%d [%thread] %-5level %logger{1024} - %msg%n"</pattern>
+ </encoder>
+ </appender>
+ <appender name="perfLogs" class="ch.qos.logback.core.ConsoleAppender">
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ </filter>
+ <encoder>
+ <pattern>"%d [%thread] %-5level %logger{1024} - %msg%n"</pattern>
+ </encoder>
+ </appender>
+ <appender name="ASYNC-audit" class="ch.qos.logback.classic.AsyncAppender">
+ <queueSize>1000</queueSize>
+ <discardingThreshold>0</discardingThreshold>
+ <appender-ref ref="Audit-Record-Queue"/>
+ </appender>
+
+ <logger name="AuditRecord" level="INFO" additivity="FALSE">
+ <appender-ref ref="STDOUT"/>
+ </logger>
+ <logger name="AuditRecord_DirectCall" level="INFO" additivity="FALSE">
+ <appender-ref ref="STDOUT"/>
+ </logger>
+ <appender name="ASYNC-perf" class="ch.qos.logback.classic.AsyncAppender">
+ <queueSize>1000</queueSize>
+ <discardingThreshold>0</discardingThreshold>
+ <appender-ref ref="Performance-Tracker-Queue"/>
+ </appender>
+ <logger name="PerfTrackerRecord" level="INFO" additivity="FALSE">
+ <appender-ref ref="ASYNC-perf"/>
+ <appender-ref ref="perfLogs"/>
+ </logger>
+ <!-- logback jms appenders & loggers definition ends here -->
+
+ <root level="DEBUG">
+ <appender-ref ref="DEBUG"/>
+ <appender-ref ref="ERROR"/>
+ <appender-ref ref="INFO"/>
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>
diff --git a/src/test/resources/dmaap-msg-router/message-router-compose.yml b/src/test/resources/dmaap-msg-router/message-router-compose.yml
new file mode 100644
index 00000000..e110a96f
--- /dev/null
+++ b/src/test/resources/dmaap-msg-router/message-router-compose.yml
@@ -0,0 +1,82 @@
+version: '2'
+services:
+ zookeeper:
+ image: nexus3.onap.org:10001/onap/dmaap/zookeeper:6.0.3
+ ports:
+ - "2181:2181"
+ environment:
+ ZOOKEEPER_REPLICAS: 1
+ ZOOKEEPER_TICK_TIME: 2000
+ ZOOKEEPER_SYNC_LIMIT: 5
+ ZOOKEEPER_INIT_LIMIT: 10
+ ZOOKEEPER_MAX_CLIENT_CNXNS: 200
+ ZOOKEEPER_AUTOPURGE_SNAP_RETAIN_COUNT: 3
+ ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: 24
+ ZOOKEEPER_CLIENT_PORT: 2181
+ KAFKA_OPTS: -Djava.security.auth.login.config=/etc/zookeeper/secrets/jaas/zk_server_jaas.conf -Dzookeeper.kerberos.removeHostFromPrincipal=true -Dzookeeper.kerberos.removeRealmFromPrincipal=true -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider -Dzookeeper.requireClientAuthScheme=sasl
+ ZOOKEEPER_SERVER_ID: 1
+ volumes:
+ - ./zk_server_jaas.conf:/etc/zookeeper/secrets/jaas/zk_server_jaas.conf
+ networks:
+ net:
+ aliases:
+ - zookeeper
+
+ kafka:
+ image: nexus3.onap.org:10001/onap/dmaap/kafka111:1.0.5
+ ports:
+ - "9092:9092"
+ environment:
+ enableCadi: 'false'
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 40000
+ KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 40000
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT://kafka:9092
+ KAFKA_LISTENERS: INTERNAL_PLAINTEXT://0.0.0.0:9092
+ KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT
+ KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
+ KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/jaas/zk_client_jaas.conf
+ KAFKA_ZOOKEEPER_SET_ACL: 'true'
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ volumes:
+ - ./zk_client_jaas.conf:/etc/kafka/secrets/jaas/zk_client_jaas.conf
+ networks:
+ net:
+ aliases:
+ - kafka
+ depends_on:
+ - zookeeper
+
+ onap-dmaap:
+ image: nexus3.onap.org:10001/onap/dmaap/dmaap-mr:1.1.20
+ ports:
+ - "3904:3904"
+ - "3905:3905"
+ environment:
+ enableCadi: 'false'
+ volumes:
+ - ./MsgRtrApi.properties:/appl/dmaapMR1/bundleconfig/etc/appprops/MsgRtrApi.properties
+ - ./logback.xml:/appl/dmaapMR1/bundleconfig/etc/logback.xml
+ - ./cadi.properties:/appl/dmaapMR1/etc/cadi.properties
+ networks:
+ net:
+ aliases:
+ - onap-dmaap
+ depends_on:
+ - zookeeper
+ - kafka
+
+ mockserver:
+ image: mockserver/mockserver:mockserver-5.11.2
+ command: -serverPort 1090 -proxyRemotePort 3904 -proxyRemoteHost onap-dmaap
+ ports:
+ - "1080:1090"
+ networks:
+ - net
+ depends_on:
+ - onap-dmaap
+
+networks:
+ net:
+ driver: bridge
diff --git a/src/test/resources/dmaap-msg-router/zk_client_jaas.conf b/src/test/resources/dmaap-msg-router/zk_client_jaas.conf
new file mode 100644
index 00000000..d4ef1eb0
--- /dev/null
+++ b/src/test/resources/dmaap-msg-router/zk_client_jaas.conf
@@ -0,0 +1,5 @@
+Client {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="kafka"
+ password="kafka_secret";
+ }; \ No newline at end of file
diff --git a/src/test/resources/dmaap-msg-router/zk_server_jaas.conf b/src/test/resources/dmaap-msg-router/zk_server_jaas.conf
new file mode 100644
index 00000000..26bf4601
--- /dev/null
+++ b/src/test/resources/dmaap-msg-router/zk_server_jaas.conf
@@ -0,0 +1,4 @@
+Server {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ user_kafka=kafka_secret;
+}; \ No newline at end of file
diff --git a/src/test/resources/testParseDMaaPCredentialsLegacy.json b/src/test/resources/testParseDMaaPCredentialsLegacy.json
deleted file mode 100644
index ca59c7e7..00000000
--- a/src/test/resources/testParseDMaaPCredentialsLegacy.json
+++ /dev/null
@@ -1,26 +0,0 @@
-{
- "channels": [
- {
- "name": "auth-credentials-null",
- "cambria.url": "127.0.0.1:3904",
- "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com",
- "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV",
- "basicAuthPassword": null,
- "basicAuthUsername": null,
- },
- {
- "name": "auth-credentials-present",
- "cambria.url": "127.0.0.1:3904",
- "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com",
- "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV",
- "basicAuthPassword": "samplePassword",
- "basicAuthUsername": "sampleUser",
- },
- {
- "name": "auth-credentials-missing",
- "cambria.url": "127.0.0.1:3904",
- "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com",
- "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV",
- }
- ]
-} \ No newline at end of file
diff --git a/src/test/resources/testParseDMaaPLegacy.json b/src/test/resources/testParseDMaaPLegacy.json
deleted file mode 100644
index 9661e30c..00000000
--- a/src/test/resources/testParseDMaaPLegacy.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
- "channels": [
- {
- "name": "url-precedes-hosts",
- "cambria.url": "127.0.0.1:3904",
- "cambria.hosts": "h1.att.com,h2.att.com",
- "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV",
- },
- {
- "name": "url-key-missing",
- "cambria.hosts": "h1.att.com,h2.att.com",
- "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV",
- },
- {
- "name": "url-is-null",
- "cambria.url": null,
- "cambria.hosts": "h1.att.com,h2.att.com",
- "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV"
- }
- ]
-} \ No newline at end of file
diff --git a/src/test/resources/ves7_batch_stdnDefined_withDifferentStndDefinedNamespace.json b/src/test/resources/ves7_batch_stdnDefined_withDifferentStndDefinedNamespace.json
new file mode 100644
index 00000000..0a9604d4
--- /dev/null
+++ b/src/test/resources/ves7_batch_stdnDefined_withDifferentStndDefinedNamespace.json
@@ -0,0 +1,108 @@
+
+{
+ "eventList": [
+ {
+ "commonEventHeader": {
+ "version": "4.1",
+ "vesEventListenerVersion": "7.2",
+ "domain": "stndDefined",
+ "eventId": "stndDefined-gNB_Nokia000001",
+ "eventName": "stndDefined-gNB-Nokia-PowerLost",
+ "stndDefinedNamespace": "3GPP-FaultSupervision",
+ "startEpochMicrosec": 1413378172000000,
+ "lastEpochMicrosec": 1413378172000000,
+ "reportingEntityName": "ibcx0001vm002oam001",
+ "sourceName": "scfx0001vm002cap001",
+ "sequence": 1,
+ "priority": "High"
+ },
+ "stndDefinedFields": {
+ "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm",
+ "data": {
+ "href": 1,
+ "uri": "1",
+ "notificationId": 1,
+ "notificationType": "notifyNewAlarm",
+ "eventTime": "xyz",
+ "systemDN": "xyz",
+ "probableCause": 1,
+ "perceivedSeverity": "INDETERMINATE",
+ "rootCauseIndicator": false,
+ "specificProblem": "xyz",
+ "correlatedNotifications": [],
+ "backedUpStatus": true,
+ "backUpObject": "xyz",
+ "trendIndication": "MORE_SEVERE",
+ "thresholdInfo": {
+ "observedMeasurement": "new",
+ "observedValue": 123
+ },
+ "stateChangeDefinition": {
+ },
+ "monitoredAttributes": {
+ "newAtt": "new"
+ },
+ "proposedRepairActions": "xyz",
+ "additionalText": "xyz",
+ "additionalInformation": {
+ "addInfo": "new"
+ },
+ "alarmId": "1",
+ "alarmType": "COMMUNICATIONS_ALARM"
+ },
+ "stndDefinedFieldsVersion": "1.0"
+ }},
+ {
+ "commonEventHeader": {
+ "version": "4.1",
+ "vesEventListenerVersion": "7.2",
+ "domain": "stndDefined",
+ "eventId": "stndDefined-gNB_Nokia000001",
+ "eventName": "stndDefined-gNB-Nokia-PowerLost",
+ "stndDefinedNamespace": "3GPP-FaultSupervision2",
+ "startEpochMicrosec": 1413378172000000,
+ "lastEpochMicrosec": 1413378172000000,
+ "reportingEntityName": "ibcx0001vm002oam001",
+ "sourceName": "scfx0001vm002cap001",
+ "sequence": 1,
+ "priority": "High"
+ },
+ "stndDefinedFields": {
+ "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm",
+ "data": {
+ "href": 1,
+ "uri": "1",
+ "notificationId": 1,
+ "notificationType": "notifyNewAlarm",
+ "eventTime": "xyz",
+ "systemDN": "xyz",
+ "probableCause": 1,
+ "perceivedSeverity": "INDETERMINATE",
+ "rootCauseIndicator": false,
+ "specificProblem": "xyz",
+ "correlatedNotifications": [],
+ "backedUpStatus": true,
+ "backUpObject": "xyz",
+ "trendIndication": "MORE_SEVERE",
+ "thresholdInfo": {
+ "observedMeasurement": "new",
+ "observedValue": 123
+ },
+ "stateChangeDefinition": {
+ },
+ "monitoredAttributes": {
+ "newAtt": "new"
+ },
+ "proposedRepairActions": "xyz",
+ "additionalText": "xyz",
+ "additionalInformation": {
+ "addInfo": "new"
+ },
+ "alarmId": "1",
+ "alarmType": "COMMUNICATIONS_ALARM"
+ },
+ "stndDefinedFieldsVersion": "1.0"
+ }}
+ ]
+}
+
diff --git a/src/test/resources/ves7_batch_stdnDefined_withSameStndDefinedNamespace.json b/src/test/resources/ves7_batch_stdnDefined_withSameStndDefinedNamespace.json
new file mode 100644
index 00000000..7e095d56
--- /dev/null
+++ b/src/test/resources/ves7_batch_stdnDefined_withSameStndDefinedNamespace.json
@@ -0,0 +1,108 @@
+
+{
+ "eventList": [
+ {
+ "commonEventHeader": {
+ "version": "4.1",
+ "vesEventListenerVersion": "7.2",
+ "domain": "stndDefined",
+ "eventId": "stndDefined-gNB_Nokia000001",
+ "eventName": "stndDefined-gNB-Nokia-PowerLost",
+ "stndDefinedNamespace": "3GPP-FaultSupervision",
+ "startEpochMicrosec": 1413378172000000,
+ "lastEpochMicrosec": 1413378172000000,
+ "reportingEntityName": "ibcx0001vm002oam001",
+ "sourceName": "scfx0001vm002cap001",
+ "sequence": 1,
+ "priority": "High"
+ },
+ "stndDefinedFields": {
+ "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm",
+ "data": {
+ "href": 1,
+ "uri": "1",
+ "notificationId": 1,
+ "notificationType": "notifyNewAlarm",
+ "eventTime": "xyz",
+ "systemDN": "xyz",
+ "probableCause": 1,
+ "perceivedSeverity": "INDETERMINATE",
+ "rootCauseIndicator": false,
+ "specificProblem": "xyz",
+ "correlatedNotifications": [],
+ "backedUpStatus": true,
+ "backUpObject": "xyz",
+ "trendIndication": "MORE_SEVERE",
+ "thresholdInfo": {
+ "observedMeasurement": "new",
+ "observedValue": 123
+ },
+ "stateChangeDefinition": {
+ },
+ "monitoredAttributes": {
+ "newAtt": "new"
+ },
+ "proposedRepairActions": "xyz",
+ "additionalText": "xyz",
+ "additionalInformation": {
+ "addInfo": "new"
+ },
+ "alarmId": "1",
+ "alarmType": "COMMUNICATIONS_ALARM"
+ },
+ "stndDefinedFieldsVersion": "1.0"
+ }},
+ {
+ "commonEventHeader": {
+ "version": "4.1",
+ "vesEventListenerVersion": "7.2",
+ "domain": "stndDefined",
+ "eventId": "stndDefined-gNB_Nokia000001",
+ "eventName": "stndDefined-gNB-Nokia-PowerLost",
+ "stndDefinedNamespace": "3GPP-FaultSupervision",
+ "startEpochMicrosec": 1413378172000000,
+ "lastEpochMicrosec": 1413378172000000,
+ "reportingEntityName": "ibcx0001vm002oam001",
+ "sourceName": "scfx0001vm002cap001",
+ "sequence": 1,
+ "priority": "High"
+ },
+ "stndDefinedFields": {
+ "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm",
+ "data": {
+ "href": 1,
+ "uri": "1",
+ "notificationId": 1,
+ "notificationType": "notifyNewAlarm",
+ "eventTime": "xyz",
+ "systemDN": "xyz",
+ "probableCause": 1,
+ "perceivedSeverity": "INDETERMINATE",
+ "rootCauseIndicator": false,
+ "specificProblem": "xyz",
+ "correlatedNotifications": [],
+ "backedUpStatus": true,
+ "backUpObject": "xyz",
+ "trendIndication": "MORE_SEVERE",
+ "thresholdInfo": {
+ "observedMeasurement": "new",
+ "observedValue": 123
+ },
+ "stateChangeDefinition": {
+ },
+ "monitoredAttributes": {
+ "newAtt": "new"
+ },
+ "proposedRepairActions": "xyz",
+ "additionalText": "xyz",
+ "additionalInformation": {
+ "addInfo": "new"
+ },
+ "alarmId": "1",
+ "alarmType": "COMMUNICATIONS_ALARM"
+ },
+ "stndDefinedFieldsVersion": "1.0"
+ }}
+ ]
+}
+
diff --git a/src/test/resources/ves7_batch_valid_two_different_domain.json b/src/test/resources/ves7_batch_valid_two_different_domain.json
new file mode 100644
index 00000000..648c81ca
--- /dev/null
+++ b/src/test/resources/ves7_batch_valid_two_different_domain.json
@@ -0,0 +1,90 @@
+
+{
+ "eventList": [
+ {
+ "commonEventHeader": {
+ "version": "4.0.1",
+ "vesEventListenerVersion": "7.0.1",
+ "domain": "fault",
+ "eventName": "Fault_Vscf:Acs-Ericcson_PilotNumberPoolExhaustion",
+ "eventId": "fault0000250",
+ "sequence": 1,
+ "priority": "High",
+ "reportingEntityId": "cc305d54-75b4-431b-adb2-eb6b9e541234",
+ "reportingEntityName": "ibcx0001vm002oam0011234",
+ "sourceId": "de305d54-75b4-431b-adb2-eb6b9e546014",
+ "sourceName": "scfx0001vm002cap001",
+ "nfVendorName": "Ericsson",
+ "nfNamingCode": "scfx",
+ "nfcNamingCode": "ssc",
+ "startEpochMicrosec": 1413378172000000,
+ "lastEpochMicrosec": 1413378172000000,
+ "timeZoneOffset": "UTC-05:30"
+ },
+ "faultFields": {
+ "faultFieldsVersion": "4.0",
+ "alarmCondition": "PilotNumberPoolExhaustion",
+ "eventSourceType": "other",
+ "specificProblem": "Calls cannot complete - pilot numbers are unavailable",
+ "eventSeverity": "CRITICAL",
+ "vfStatus": "Active",
+ "alarmAdditionalInformation": {
+ "PilotNumberPoolSize": "1000"
+ }
+ }
+ },
+ {
+ "commonEventHeader": {
+ "version": "4.1",
+ "vesEventListenerVersion": "7.2",
+ "domain": "stndDefined",
+ "eventId": "stndDefined-gNB_Nokia000001",
+ "eventName": "stndDefined-gNB-Nokia-PowerLost",
+ "stndDefinedNamespace": "3GPP-FaultSupervision",
+ "startEpochMicrosec": 1413378172000000,
+ "lastEpochMicrosec": 1413378172000000,
+ "reportingEntityName": "ibcx0001vm002oam001",
+ "sourceName": "scfx0001vm002cap001",
+ "sequence": 1,
+ "priority": "High"
+ },
+ "stndDefinedFields": {
+ "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm",
+ "data": {
+ "href": 1,
+ "uri": "1",
+ "notificationId": 1,
+ "notificationType": "notifyNewAlarm",
+ "eventTime": "xyz",
+ "systemDN": "xyz",
+ "probableCause": 1,
+ "perceivedSeverity": "INDETERMINATE",
+ "rootCauseIndicator": false,
+ "specificProblem": "xyz",
+ "correlatedNotifications": [],
+ "backedUpStatus": true,
+ "backUpObject": "xyz",
+ "trendIndication": "MORE_SEVERE",
+ "thresholdInfo": {
+ "observedMeasurement": "new",
+ "observedValue": 123
+ },
+ "stateChangeDefinition": {
+ },
+ "monitoredAttributes": {
+ "newAtt": "new"
+ },
+ "proposedRepairActions": "xyz",
+ "additionalText": "xyz",
+ "additionalInformation": {
+ "addInfo": "new"
+ },
+ "alarmId": "1",
+ "alarmType": "COMMUNICATIONS_ALARM"
+ },
+ "stndDefinedFieldsVersion": "1.0"
+ }
+ }
+ ]
+}
+