summaryrefslogtreecommitdiffstats
path: root/rest-services/cbs-client/src/test
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-03-25 15:32:42 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-03-26 12:55:04 +0100
commit015668f4a24fcc497151c6142a0bf70717c55f8e (patch)
tree6af75819471a378f203d4849844d2cfb384df444 /rest-services/cbs-client/src/test
parent51d3ae2b08dd49029cd9c86bfe8d95d1eef14326 (diff)
Add streams parsing integration tests
Change-Id: I22410b3fb110e47bde123556951bb12af5f34a1c Issue-ID: DCAEGEN2-1315 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'rest-services/cbs-client/src/test')
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java151
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java204
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java23
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java103
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java27
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java29
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java29
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java29
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java7
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java7
-rw-r--r--rest-services/cbs-client/src/test/resources/streams/integration_message_router.json62
-rw-r--r--rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json80
12 files changed, 666 insertions, 85 deletions
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java
new file mode 100644
index 00000000..c57ce027
--- /dev/null
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java
@@ -0,0 +1,151 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName;
+
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class MessageRouterSinksIT {
+
+ final JsonObject json = GsonUtils.readObjectFromResource("/streams/integration_message_router.json");
+
+ MessageRouterSinksIT() throws IOException {
+ }
+
+ @Test
+ void thereShouldBeNoDataSources() {
+ assertThat(DataStreams.namedSources(json)).isEmpty();
+ }
+
+ @Test
+ void thereShouldBeSomeSinksDefined() {
+ assertThat(DataStreams.namedSinks(json)).isNotEmpty();
+ assertThat(DataStreams.namedSinks(json)).hasSize(4);
+ }
+
+ @Test
+ void allSinksShouldBeOfMessageRouterType() {
+ assertThat(DataStreams.namedSinks(json).map(RawDataStream::type).distinct())
+ .containsExactly(StreamType.MESSAGE_ROUTER);
+ }
+
+ @Test
+ void sinksShouldHaveProperDirection() {
+ assertThat(DataStreams.namedSinks(json).map(RawDataStream::direction).distinct())
+ .containsExactly(DataStreamDirection.SINK);
+ }
+
+ @Test
+ void verifySecMeasurementSink() {
+ // given
+ final String streamName = "sec_measurement";
+ final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName))
+ .get();
+
+ // when
+ final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNotNull();
+ assertThat(parsedSink.aafCredentials().username()).describedAs("aaf user name").isEqualTo("aaf_username");
+ assertThat(parsedSink.aafCredentials().password()).describedAs("aaf password").isEqualTo("aaf_password");
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5");
+ assertThat(parsedSink.clientId()).describedAs("client id").isEqualTo("111111");
+ assertThat(parsedSink.clientRole()).describedAs("client role").isEqualTo("com.att.dcae.member");
+ assertThat(parsedSink.topicUrl()).describedAs("topic url")
+ .isEqualTo("https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-MEASUREMENT-OUTPUT");
+ }
+
+ @Test
+ void verifySecFaultUnsecureSink() {
+ // given
+ final String streamName = "sec_fault_unsecure";
+ final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName))
+ .get();
+
+ // when
+ final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNull();
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5");
+ assertThat(parsedSink.clientId()).describedAs("client id").isNull();
+ assertThat(parsedSink.clientRole()).describedAs("client role").isNull();
+ assertThat(parsedSink.topicUrl()).describedAs("topic url")
+ .isEqualTo("http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV");
+ }
+
+ @Test
+ void verifySecMeasurementUnsecureSink() {
+ // given
+ final String streamName = "sec_measurement_unsecure";
+ final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName))
+ .get();
+
+ // when
+ final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNull();
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5");
+ assertThat(parsedSink.clientId()).describedAs("client id").isNull();
+ assertThat(parsedSink.clientRole()).describedAs("client role").isNull();
+ assertThat(parsedSink.topicUrl()).describedAs("topic url")
+ .isEqualTo("http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV");
+ }
+
+ @Test
+ void verifySecFaultSink() {
+ // given
+ final String streamName = "sec_fault";
+ final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName))
+ .get();
+
+ // when
+ final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNotNull();
+ assertThat(parsedSink.aafCredentials().username()).describedAs("aaf user name").isEqualTo("aaf_username");
+ assertThat(parsedSink.aafCredentials().password()).describedAs("aaf password").isEqualTo("aaf_password");
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5");
+ assertThat(parsedSink.clientId()).describedAs("client id").isEqualTo("222222");
+ assertThat(parsedSink.clientRole()).describedAs("client role").isEqualTo("com.att.dcae.member");
+ assertThat(parsedSink.topicUrl()).describedAs("topic url")
+ .isEqualTo("https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-FAULT-OUTPUT");
+ }
+} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java
new file mode 100644
index 00000000..4508939a
--- /dev/null
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java
@@ -0,0 +1,204 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName;
+
+import com.google.gson.JsonObject;
+import io.vavr.collection.List;
+import java.io.IOException;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class MixedDmaapStreamsIT {
+
+ final JsonObject json = GsonUtils.readObjectFromResource("/streams/integration_mixed_dmaap.json");
+ final List<RawDataStream<JsonObject>> sources = DataStreams.namedSources(json).toList();
+ final List<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json).toList();
+
+ MixedDmaapStreamsIT() throws IOException {
+ }
+
+ @Test
+ void thereShouldBeSomeSinksDefined() {
+ assertThat(sinks).isNotEmpty();
+ assertThat(sinks).hasSize(3);
+ }
+
+ @Test
+ void thereShouldBeSomeSourcesDefined() {
+ assertThat(sources).isNotEmpty();
+ assertThat(sources).hasSize(3);
+ }
+
+ @Test
+ void allStreamsShouldBeOfProperType() {
+ assertThat(sources.map(RawDataStream::type).distinct()).containsExactly(StreamType.DATA_ROUTER, StreamType.MESSAGE_ROUTER);
+ assertThat(sinks.map(RawDataStream::type).distinct()).containsExactly(StreamType.DATA_ROUTER);
+ }
+
+ @Test
+ void sinksShouldHaveProperDirection() {
+ assertThat(sinks.map(RawDataStream::direction).distinct())
+ .containsExactly(DataStreamDirection.SINK);
+ }
+
+ @Test
+ void sourcesShouldHaveProperDirection() {
+ assertThat(sources.map(RawDataStream::direction).distinct())
+ .containsExactly(DataStreamDirection.SOURCE);
+ }
+
+ @Test
+ void verifyDcaeGuestOsSource() {
+ // given
+ final String streamName = "DCAE_GUEST_OS";
+ final RawDataStream<JsonObject> source = sources.find(streamWithName(streamName)).get();
+
+ // when
+ final DataRouterSource parsedSource = StreamFromGsonParsers.dataRouterSourceParser().unsafeParse(source);
+
+ // then
+ assertThat(parsedSource.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSource.location()).describedAs("location").isEqualTo("mtn23");
+ assertThat(parsedSource.username()).describedAs("user name").isEqualTo("xyz");
+ assertThat(parsedSource.password()).describedAs("password").isEqualTo("abc");
+ assertThat(parsedSource.deliveryUrl()).describedAs("delivery url")
+ .isEqualTo("https://dr.global:8666/DCAE_SAM_GUEST_OS");
+ assertThat(parsedSource.subscriberId()).describedAs("subscriber id").isEqualTo("811");
+ }
+
+ @Test
+ void verifyDcaeRawDataSource() {
+ // given
+ final String streamName = "DCAE_RAW_DATA";
+ final RawDataStream<JsonObject> source = sources.find(streamWithName(streamName)).get();
+
+ // when
+ final DataRouterSource parsedSource = StreamFromGsonParsers.dataRouterSourceParser().unsafeParse(source);
+
+ // then
+ assertThat(parsedSource.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSource.location()).describedAs("location").isEqualTo("mtn23");
+ assertThat(parsedSource.username()).describedAs("user name").isEqualTo("abc");
+ assertThat(parsedSource.password()).describedAs("password").isEqualTo("xyz");
+ assertThat(parsedSource.deliveryUrl()).describedAs("delivery url")
+ .isEqualTo("https://dr.global:8666/DCAE_CEILOMETER_RAW_DATA");
+ assertThat(parsedSource.subscriberId()).describedAs("subscriber id").isEqualTo("812");
+ }
+
+ @Test
+ void verifySecMeasurementOutputSource() {
+ // given
+ final String streamName = "sec-measurement-output";
+ final RawDataStream<JsonObject> source = sources.find(streamWithName(streamName))
+ .get();
+
+ // when
+ final MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source);
+
+ // then
+ assertThat(parsedSource.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSource.aafCredentials()).describedAs("aaf credentials").isNotNull();
+ assertThat(parsedSource.aafCredentials().username()).describedAs("aaf user name").isEqualTo("aaf_username");
+ assertThat(parsedSource.aafCredentials().password()).describedAs("aaf password").isEqualTo("aaf_password");
+ assertThat(parsedSource.location()).describedAs("location").isEqualTo("mtn23");
+ assertThat(parsedSource.clientId()).describedAs("client id").isEqualTo("1111");
+ assertThat(parsedSource.clientRole()).describedAs("client role").isEqualTo("com.att.dcae.member");
+ assertThat(parsedSource.topicUrl()).describedAs("topic url")
+ .isEqualTo("https://mr.hostname:3905/events/com.att.dcae.dmaap.SEC-MEASUREMENT-OUTPUT-v1");
+ }
+
+ @Test
+ void verifyDcaeVoipPmDataSink() {
+ // given
+ final String streamName = "DCAE_VOIP_PM_DATA";
+ final RawDataStream<JsonObject> sink = sinks.find(streamWithName(streamName)).get();
+
+ // when
+ final DataRouterSink parsedSink = StreamFromGsonParsers.dataRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtn23");
+ assertThat(parsedSink.username()).describedAs("user name").isEqualTo("abc");
+ assertThat(parsedSink.password()).describedAs("password").isEqualTo("xyz");
+ assertThat(parsedSink.logUrl()).describedAs("log url")
+ .isEqualTo("https://dcae-drps/feedlog/206");
+ assertThat(parsedSink.publishUrl()).describedAs("publish url")
+ .isEqualTo("https://dcae-drps/publish/206");
+ assertThat(parsedSink.publisherId()).describedAs("publisher id").isEqualTo("206.518hu");
+ }
+
+ @Test
+ void verifyDcaeGuestOsOSink() {
+ // given
+ final String streamName = "DCAE_GUEST_OS_O";
+ final RawDataStream<JsonObject> sink = sinks.find(streamWithName(streamName)).get();
+
+ // when
+ final DataRouterSink parsedSink = StreamFromGsonParsers.dataRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtn23");
+ assertThat(parsedSink.username()).describedAs("user name").isEqualTo("axyz");
+ assertThat(parsedSink.password()).describedAs("password").isEqualTo("abc");
+ assertThat(parsedSink.logUrl()).describedAs("log url")
+ .isEqualTo("https://dcae-drps/feedlog/203");
+ assertThat(parsedSink.publishUrl()).describedAs("publish url")
+ .isEqualTo("https://dcae-drps/publish/203");
+ assertThat(parsedSink.publisherId()).describedAs("publisher id").isEqualTo("203.2od8s");
+ }
+
+
+ @Test
+ void verifyDcaePmDataSink() {
+ // given
+ final String streamName = "DCAE_PM_DATA";
+ final RawDataStream<JsonObject> sink = sinks.find(streamWithName(streamName)).get();
+
+ // when
+ final DataRouterSink parsedSink = StreamFromGsonParsers.dataRouterSinkParser().unsafeParse(sink);
+
+ // then
+ assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+ assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtn23bdce2");
+ assertThat(parsedSink.username()).describedAs("user name").isEqualTo("xyz");
+ assertThat(parsedSink.password()).describedAs("password").isEqualTo("abc");
+ assertThat(parsedSink.logUrl()).describedAs("log url")
+ .isEqualTo("https://dcae-drps/feedlog/493");
+ assertThat(parsedSink.publishUrl()).describedAs("publish url")
+ .isEqualTo("https://dcae-drps/publish/493");
+ assertThat(parsedSink.publisherId()).describedAs("publisher id").isEqualTo("493.eacqs");
+ }
+
+} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
index a51b87aa..a296c920 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
@@ -23,9 +23,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
import static org.assertj.core.api.Assertions.assertThat;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType;
+import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA;
+import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.MESSAGE_ROUTER;
import com.google.gson.JsonObject;
-import io.vavr.collection.Map;
import io.vavr.collection.Stream;
import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
@@ -42,10 +44,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.Strea
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -182,12 +184,11 @@ class CbsClientImplIT {
// when
final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request))
.map(json -> {
- final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json)
- .groupBy(RawDataStream::type);
+ final Stream<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json);
- final Stream<KafkaSink> allKafkaSinks = sinks.getOrElse("kafka", Stream.empty())
+ final Stream<KafkaSink> allKafkaSinks = sinks.filter(streamOfType(KAFKA))
.map(kafkaSinkParser::unsafeParse);
- final Stream<MessageRouterSink> allMrSinks = sinks.getOrElse("message_router", Stream.empty())
+ final Stream<MessageRouterSink> allMrSinks = sinks.filter(streamOfType(MESSAGE_ROUTER))
.map(mrSinkParser::unsafeParse);
assertThat(allKafkaSinks.size())
@@ -225,8 +226,8 @@ class CbsClientImplIT {
.expectErrorSatisfies(ex -> {
assertThat(ex).isInstanceOf(StreamParsingException.class);
assertThat(ex).hasMessageContaining("Invalid stream type");
- assertThat(ex).hasMessageContaining("message_router");
- assertThat(ex).hasMessageContaining("kafka");
+ assertThat(ex).hasMessageContaining(MESSAGE_ROUTER.toString());
+ assertThat(ex).hasMessageContaining(KAFKA.toString());
})
.verify(Duration.ofSeconds(5));
}
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java
new file mode 100644
index 00000000..a26af446
--- /dev/null
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java
@@ -0,0 +1,103 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.GsonAdaptersAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class DmaapUtilsTest {
+
+ @Test
+ void extractAafCredentials_shouldReturnNull_whenAllFieldsAreNull() {
+ // given
+ Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+ JsonObject json = gson.fromJson("{\"aaf_username\":null,\"aaf_password\":null}", JsonObject.class);
+
+ // when
+ final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+ // then
+ assertThat(result).isNull();
+ }
+
+ @Test
+ void extractAafCredentials_shouldReturnNull_whenAllFieldsAreAbsent() {
+ // given
+ Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+ JsonObject json = gson.fromJson("{\"whatever\":\"else\"}", JsonObject.class);
+
+ // when
+ final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+ // then
+ assertThat(result).isNull();
+ }
+
+ @Test
+ void extractAafCredentials_shouldReturnValue_whenBothFieldsAreSet() {
+ // given
+ Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+ JsonObject json = gson.fromJson("{\"aaf_username\":\"uname\",\"aaf_password\":\"passwd\"}", JsonObject.class);
+
+ // when
+ final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+ // then
+ assertThat(result).isEqualTo(ImmutableAafCredentials.builder().username("uname").password("passwd").build());
+ }
+
+ @Test
+ void extractAafCredentials_shouldReturnValueWithUser_whenOnlyUserIsSet() {
+ // given
+ Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+ JsonObject json = gson.fromJson("{\"aaf_username\":\"uname\"}", JsonObject.class);
+
+ // when
+ final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+ // then
+ assertThat(result).isEqualTo(ImmutableAafCredentials.builder().username("uname").build());
+ }
+
+ @Test
+ void extractAafCredentials_shouldReturnValueWithUser_whenPasswordIsNull() {
+ // given
+ Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+ JsonObject json = gson.fromJson("{\"aaf_username\":\"uname\",\"aaf_password\":null}", JsonObject.class);
+
+ // when
+ final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+ // then
+ assertThat(result).isEqualTo(ImmutableAafCredentials.builder().username("uname").build());
+ }
+} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java
index 7092de5a..90c69942 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java
@@ -19,25 +19,22 @@
*/
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
-import com.google.gson.Gson;
+import static org.assertj.core.api.Assertions.assertThat;
+
import com.google.gson.JsonObject;
import io.vavr.control.Either;
+import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink;
-
-import java.io.IOException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSink;
class DataRouterSinkParserTest {
@@ -101,8 +98,8 @@ class DataRouterSinkParserTest {
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
assertThat(error.message()).contains("Invalid stream type");
- assertThat(error.message()).contains("Expected '" + DATA_ROUTER_TYPE + "', but was '"
- + MESSAGE_ROUTER_TYPE + "'");
+ assertThat(error.message()).contains("Expected '" + StreamType.DATA_ROUTER + "', but was '"
+ + StreamType.MESSAGE_ROUTER + "'");
}
);
}
@@ -113,7 +110,7 @@ class DataRouterSinkParserTest {
JsonObject json = new JsonObject();
final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
.name("empty")
- .type("data_router")
+ .type(StreamType.DATA_ROUTER)
.descriptor(json)
.direction(DataStreamDirection.SINK)
.build();
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java
index b2d01309..f704e523 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java
@@ -19,27 +19,22 @@
*/
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import static org.assertj.core.api.Assertions.assertThat;
+
import com.google.gson.JsonObject;
import io.vavr.control.Either;
+import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
-
-import java.io.IOException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSource;
public class DataRouterSourceParserTest {
@@ -100,8 +95,8 @@ public class DataRouterSourceParserTest {
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
assertThat(error.message()).contains("Invalid stream type");
- assertThat(error.message()).contains("Expected '" + DATA_ROUTER_TYPE + "', but was '"
- + MESSAGE_ROUTER_TYPE + "'");
+ assertThat(error.message()).contains("Expected '" + StreamType.DATA_ROUTER + "', but was '"
+ + StreamType.MESSAGE_ROUTER + "'");
}
);
}
@@ -112,7 +107,7 @@ public class DataRouterSourceParserTest {
JsonObject json = new JsonObject();
final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
.name("empty")
- .type("data_router")
+ .type(StreamType.DATA_ROUTER)
.descriptor(json)
.direction(DataStreamDirection.SOURCE)
.build();
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java
index 4d3b88b8..e3182c5c 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java
@@ -19,25 +19,21 @@
*/
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
-import com.google.gson.Gson;
+import static org.assertj.core.api.Assertions.assertThat;
+
import com.google.gson.JsonObject;
import io.vavr.control.Either;
+import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
-
-import java.io.IOException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -83,8 +79,7 @@ public class MessageRouterSinkParserTest {
// then
assertThat(result).isInstanceOf(MessageRouterSink.class);
assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
- assertThat(result.aafCredentials().username()).isNull();
- assertThat(result.aafCredentials().password()).isNull();
+ assertThat(result.aafCredentials()).isNull();
assertThat(result.clientId()).isNull();
}
@@ -100,8 +95,8 @@ public class MessageRouterSinkParserTest {
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
assertThat(error.message()).contains("Invalid stream type");
- assertThat(error.message()).contains("Expected '" + MESSAGE_ROUTER_TYPE + "', but was '"
- + DATA_ROUTER_TYPE + "'");
+ assertThat(error.message()).contains("Expected '" + StreamType.MESSAGE_ROUTER + "', but was '"
+ + StreamType.DATA_ROUTER + "'");
}
);
}
@@ -112,7 +107,7 @@ public class MessageRouterSinkParserTest {
JsonObject json = new JsonObject();
final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
.name("empty")
- .type("data_router")
+ .type(StreamType.MESSAGE_ROUTER)
.descriptor(json)
.direction(DataStreamDirection.SINK)
.build();
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java
index d497817f..51e56764 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java
@@ -19,25 +19,21 @@
*/
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
-import com.google.gson.Gson;
+import static org.assertj.core.api.Assertions.assertThat;
+
import com.google.gson.JsonObject;
import io.vavr.control.Either;
+import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
-
-import java.io.IOException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -81,8 +77,7 @@ public class MessageRouterSourceParserTest {
// then
assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
- assertThat(result.aafCredentials().username()).isNull();
- assertThat(result.aafCredentials().password()).isNull();
+ assertThat(result.aafCredentials()).isNull();
assertThat(result.clientId()).isNull();
}
@@ -98,8 +93,8 @@ public class MessageRouterSourceParserTest {
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
assertThat(error.message()).contains("Invalid stream type");
- assertThat(error.message()).contains("Expected '" + MESSAGE_ROUTER_TYPE + "', but was '"
- + DATA_ROUTER_TYPE + "'");
+ assertThat(error.message()).contains("Expected '" + StreamType.MESSAGE_ROUTER + "', but was '"
+ + StreamType.DATA_ROUTER + "'");
}
);
}
@@ -110,7 +105,7 @@ public class MessageRouterSourceParserTest {
JsonObject json = new JsonObject();
final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
.name("empty")
- .type("data_router")
+ .type(StreamType.MESSAGE_ROUTER)
.descriptor(json)
.direction(DataStreamDirection.SOURCE)
.build();
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java
index 5974639c..2e4f71b3 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java
@@ -30,10 +30,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.St
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java
index d255d99a..1e8e3f52 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java
@@ -31,10 +31,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.St
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/test/resources/streams/integration_message_router.json b/rest-services/cbs-client/src/test/resources/streams/integration_message_router.json
new file mode 100644
index 00000000..d38b0cce
--- /dev/null
+++ b/rest-services/cbs-client/src/test/resources/streams/integration_message_router.json
@@ -0,0 +1,62 @@
+{
+ "collector.schema.file": "./etc/CommonEventFormat_27.2.json",
+ "collector.service.port": 8080,
+ "collector.dmaap.streamid": "fault=sec_fault,roadm-sec-to-hp|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert",
+ "collector.schema.checkflag": 1,
+ "tomcat.maxthreads": "200",
+ "collector.keystore.passwordfile": "/opt/app/dcae-certificate/.password",
+ "streams_subscribes": {},
+ "services_calls": {},
+ "collector.inputQueue.maxPending": 8096,
+ "header.authflag": 0,
+ "collector.keystore.file.location": "/opt/app/dcae-certificate/keystore.jks",
+ "collector.service.secure.port": -1,
+ "header.authlist": "userid1,base64encodepwd1|userid2,base64encodepwd2",
+ "collector.keystore.alias": "dynamically generated",
+ "streams_publishes": {
+ "sec_measurement": {
+ "type": "message_router",
+ "aaf_password": "aaf_password",
+ "dmaap_info": {
+ "location": "mtl5",
+ "client_id": "111111",
+ "client_role": "com.att.dcae.member",
+ "topic_url": "https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-MEASUREMENT-OUTPUT"
+ },
+ "aaf_username": "aaf_username"
+ },
+ "sec_fault_unsecure": {
+ "type": "message_router",
+ "aaf_password": null,
+ "dmaap_info": {
+ "location": "mtl5",
+ "client_id": null,
+ "client_role": null,
+ "topic_url": "http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+ },
+ "aaf_username": null
+ },
+ "sec_measurement_unsecure": {
+ "type": "message_router",
+ "aaf_password": null,
+ "dmaap_info": {
+ "location": "mtl5",
+ "client_id": null,
+ "client_role": null,
+ "topic_url": "http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+ },
+ "aaf_username": null
+ },
+ "sec_fault": {
+ "type": "message_router",
+ "aaf_password": "aaf_password",
+ "dmaap_info": {
+ "location": "mtl5",
+ "client_id": "222222",
+ "client_role": "com.att.dcae.member",
+ "topic_url": "https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-FAULT-OUTPUT"
+ },
+ "aaf_username": "aaf_username"
+ }
+ }
+}
diff --git a/rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json b/rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json
new file mode 100644
index 00000000..acc7b987
--- /dev/null
+++ b/rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json
@@ -0,0 +1,80 @@
+{
+
+ "streams_subscribes": {
+ "DCAE_GUEST_OS": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "xyz",
+ "password": "abc",
+ "location": "mtn23",
+ "delivery_url": "https://dr.global:8666/DCAE_SAM_GUEST_OS",
+ "subscriber_id": "811"
+ }
+ },
+ "DCAE_RAW_DATA": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "abc",
+ "password": "xyz",
+ "location": "mtn23",
+ "delivery_url": "https://dr.global:8666/DCAE_CEILOMETER_RAW_DATA",
+ "subscriber_id": "812"
+ }
+ },
+ "sec-measurement-output": {
+ "type": "message_router",
+ "aaf_password": "aaf_password",
+ "dmaap_info": {
+ "topic_url": "https://mr.hostname:3905/events/com.att.dcae.dmaap.SEC-MEASUREMENT-OUTPUT-v1",
+ "client_role": "com.att.dcae.member",
+ "location": "mtn23",
+ "client_id": "1111"
+ },
+ "aaf_username": "aaf_username"
+
+ }
+
+ },
+
+ "streams_publishes": {
+
+ "DCAE_VOIP_PM_DATA": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "abc",
+ "log_url": "https://dcae-drps/feedlog/206",
+ "publish_url": "https://dcae-drps/publish/206",
+ "location": "mtn23",
+ "password": "xyz",
+ "publisher_id": "206.518hu"
+
+ }
+ },
+
+ "DCAE_GUEST_OS_O": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "axyz",
+ "log_url": "https://dcae-drps/feedlog/203",
+ "publish_url": "https://dcae-drps/publish/203",
+ "location": "mtn23",
+ "password": "abc",
+
+ "publisher_id": "203.2od8s"
+ }
+ },
+
+ "DCAE_PM_DATA": {
+ "type": "data_router",
+ "dmaap_info": {
+ "username": "xyz",
+ "log_url": "https://dcae-drps/feedlog/493",
+ "publish_url": "https://dcae-drps/publish/493",
+ "location": "mtn23bdce2",
+ "password": "abc",
+ "publisher_id": "493.eacqs"
+ }
+ }
+ }
+
+} \ No newline at end of file