aboutsummaryrefslogtreecommitdiffstats
path: root/rest-services/cbs-client/src/main/java/org
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-03-14 14:34:35 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-03-19 15:06:15 +0100
commit565ec734f46a15bb3c87fe02a32613fc86c0eb22 (patch)
tree71412d433e9e952db2e2f9db6044dc1a9643e9c0 /rest-services/cbs-client/src/main/java/org
parent9b03823dc6ac4bec2e61a4e54d114a518dbb1100 (diff)
Kafka streams parsers - additions
Change-Id: I98ca661682b41d76d3de668d6faeb6ebe02f92a8 Issue-ID: DCAEGEN2-1341 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'rest-services/cbs-client/src/main/java/org')
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java7
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java57
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java3
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java9
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java7
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java77
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java43
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java)24
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java)24
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java)12
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java)6
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java)6
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java)2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java)25
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java)25
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java)14
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java)6
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java)5
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java)2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java)24
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java)24
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java51
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java6
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java41
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java35
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java8
30 files changed, 436 insertions, 119 deletions
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java
index 36589dad..989bd2db 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java
@@ -20,9 +20,9 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api;
import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsClientImpl;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsLookup;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import reactor.core.publisher.Mono;
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java
index 15c4eea2..dfd0e2f7 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java
@@ -19,17 +19,16 @@
*/
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
+import static java.lang.String.valueOf;
+
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.vavr.collection.List;
-import org.jetbrains.annotations.NotNull;
-
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-
-import static java.lang.String.valueOf;
+import org.jetbrains.annotations.NotNull;
/**
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java
new file mode 100644
index 00000000..4fdb31b1
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java
@@ -0,0 +1,57 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.vavr.collection.Stream;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+@ExperimentalApi
+public final class DataStreams {
+
+ private DataStreams() {
+ }
+
+ public static Stream<RawDataStream<JsonObject>> namedSources(JsonObject rootJson) {
+ return createCollectionOfStreams(rootJson, DataStreamDirection.SOURCE);
+ }
+
+ public static Stream<RawDataStream<JsonObject>> namedSinks(JsonObject rootJson) {
+ return createCollectionOfStreams(rootJson, DataStreamDirection.SINK);
+ }
+
+ private static Stream<RawDataStream<JsonObject>> createCollectionOfStreams(JsonObject rootJson, DataStreamDirection direction) {
+ final JsonElement streamsJson = rootJson.get(direction.configurationKey());
+ return streamsJson == null
+ ? Stream.empty()
+ : DataStreamUtils.mapJsonToStreams(streamsJson, direction);
+ }
+
+
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
index f18f2175..460d7100 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
@@ -21,10 +21,7 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
import com.google.gson.JsonObject;
-import io.vavr.control.Either;
import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
/**
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
index 9d703bb3..7ae92baf 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
@@ -20,12 +20,17 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.*;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr.DataRouterSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr.DataRouterSourceParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSourceParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since March 2019
+ * @since 1.1.4
*/
public final class StreamFromGsonParsers {
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
index 3467c809..69016ed8 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
@@ -26,6 +26,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
/**
* A generic data stream parser which parses {@code T} to data stream {@code S}.
@@ -33,7 +34,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Dat
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @param <T> input data type, eg. Gson Object
* @param <S> output data type
- * @since 1.1.3
+ * @since 1.1.4
*/
@ExperimentalApi
public interface StreamParser<T, S extends DataStream> {
@@ -44,7 +45,7 @@ public interface StreamParser<T, S extends DataStream> {
* @param input - the input data
* @return Right(parsing result) or Left(parsing error)
*/
- default Either<StreamParserError, S> parse(T input) {
+ default Either<StreamParserError, S> parse(RawDataStream<T> input) {
return Try.of(() -> unsafeParse(input))
.toEither()
.mapLeft(StreamParserError::fromThrowable);
@@ -58,7 +59,7 @@ public interface StreamParser<T, S extends DataStream> {
* @return parsing result
* @throws StreamParsingException when parsing was unsuccessful
*/
- default S unsafeParse(T input) {
+ default S unsafeParse(RawDataStream<T> input) {
return parse(input).getOrElseThrow(StreamParsingException::new);
}
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java
index 05bfc9be..9be08e3c 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java
@@ -24,9 +24,9 @@ import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import reactor.core.publisher.Mono;
public class CbsClientImpl implements CbsClient {
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java
index 53d0bd34..89daebc8 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java
@@ -22,12 +22,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
-
import java.net.InetSocketAddress;
-
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import reactor.core.publisher.Mono;
/**
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java
new file mode 100644
index 00000000..d34b1440
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.vavr.collection.Stream;
+import java.io.IOException;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+public final class DataStreamUtils {
+
+ public static Stream<RawDataStream<JsonObject>> mapJsonToStreams(JsonElement streamsJson,
+ DataStreamDirection direction) {
+ return Stream.ofAll(streamsJson.getAsJsonObject().entrySet())
+ .map(namedSinkJson -> {
+ final JsonObject jsonObject = namedSinkJson.getValue().getAsJsonObject();
+ return rawDataStream(namedSinkJson.getKey(), direction, jsonObject);
+ });
+ }
+
+ public static void assertStreamType(
+ RawDataStream<JsonObject> json,
+ String expectedType,
+ DataStreamDirection expectedDirection) {
+ if (!json.type().equals(expectedType)) {
+ throw new IllegalArgumentException(
+ "Invalid stream type. Expected '" + expectedType + "', but was '" + json.type() + "'");
+ }
+ if (json.direction() != expectedDirection) {
+ throw new IllegalArgumentException(
+ "Invalid stream direction. Expected '" + expectedDirection + "', but was '" + json.direction()
+ + "'");
+ }
+ }
+
+ public static RawDataStream<JsonObject> readSourceFromResource(String resource) throws IOException {
+ return rawDataStream(resource, DataStreamDirection.SOURCE, GsonUtils.readObjectFromResource(resource));
+ }
+
+ public static RawDataStream<JsonObject> readSinkFromResource(String resource) throws IOException {
+ return rawDataStream(resource, DataStreamDirection.SINK, GsonUtils.readObjectFromResource(resource));
+ }
+
+ private static RawDataStream<JsonObject> rawDataStream(String name, DataStreamDirection direction, JsonObject json) {
+ return ImmutableRawDataStream.<JsonObject>builder()
+ .name(name)
+ .direction(direction)
+ .type(GsonUtils.requiredString(json, "type"))
+ .descriptor(json)
+ .build();
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
index a0880165..0b662286 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
@@ -26,14 +26,14 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.vavr.Lazy;
-
+import io.vavr.control.Option;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Map.Entry;
import java.util.stream.Collectors;
-
-import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.GsonAdaptersMessageRouterDmaapInfo;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.GsonAdaptersKafkaInfo;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSource;
@@ -42,7 +42,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since March 2019
*/
-final class GsonUtils {
+public final class GsonUtils {
+
private static final Lazy<Gson> GSON = Lazy.of(() -> {
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersKafkaInfo());
@@ -56,39 +57,35 @@ final class GsonUtils {
private GsonUtils() {
}
- static Gson gsonInstance() {
+ public static Gson gsonInstance() {
return GSON.get();
}
- static void assertStreamType(JsonObject json, String expectedType) {
- final String actualType = requiredString(json, "type");
- if (!actualType.equals(expectedType)) {
- throw new IllegalArgumentException("Invalid stream type. Expected '" + expectedType + "', but was '" + actualType + "'");
- }
- }
-
- static String requiredString(JsonObject parent, String childName) {
+ public static String requiredString(JsonObject parent, String childName) {
return requiredChild(parent, childName).getAsString();
}
- static Option<String> optionalString(JsonObject parent, String childName) {
+ public static Option<String> optionalString(JsonObject parent, String childName) {
return Option.of(parent.get(childName).getAsString());
}
- static JsonElement requiredChild(JsonObject parent, String childName) {
- if (parent.has(childName)) {
- return parent.get(childName);
- } else {
- throw new IllegalArgumentException(
- "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + stringifyChildrenNames(parent));
- }
+ public static JsonElement requiredChild(JsonObject parent, String childName) {
+ return optionalChild(parent, childName)
+ .getOrElseThrow(() -> new IllegalArgumentException(
+ "Could not find sub-node '" + childName + "'. Actual sub-nodes: "
+ + stringifyChildrenNames(parent)));
+
+ }
+
+ public static Option<JsonElement> optionalChild(JsonObject parent, String childName) {
+ return Option.of(parent.get(childName));
}
- static JsonObject readObjectFromResource(String resource) throws IOException {
+ public static JsonObject readObjectFromResource(String resource) throws IOException {
return readFromResource(resource).getAsJsonObject();
}
- static JsonElement readFromResource(String resource) throws IOException {
+ public static JsonElement readFromResource(String resource) throws IOException {
try (Reader reader = new InputStreamReader(GsonUtils.class.getResourceAsStream(resource))) {
return new JsonParser().parse(reader);
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java
index 468b6180..42b86aee 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java
@@ -17,19 +17,23 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
*/
@@ -45,12 +49,12 @@ public final class DataRouterSinkParser implements StreamFromGsonParser<DataRout
this.gson = gson;
}
- public DataRouterSink unsafeParse(JsonObject input) {
- assertStreamType(input, DATA_ROUTER_TYPE);
-
- final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+ @Override
+ public DataRouterSink unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SINK);
- return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class);
+ final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
+ return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class).withName(input.name());
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java
index d78c3dde..7d29a90f 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java
@@ -17,19 +17,23 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSource;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSource;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
*/
@@ -45,12 +49,12 @@ public final class DataRouterSourceParser implements StreamFromGsonParser<DataRo
this.gson = gson;
}
- public DataRouterSource unsafeParse(JsonObject input) {
- assertStreamType(input, DATA_ROUTER_TYPE);
-
- final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+ @Override
+ public DataRouterSource unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SOURCE);
- return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class);
+ final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
+ return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class).withName(input.name());
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java
index 10c00e72..c5d254f0 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -31,15 +31,21 @@ import java.util.Objects;
*/
abstract class GsonMessageRouter implements MessageRouter {
+ private final String name;
private final MessageRouterDmaapInfo dmaapInfo;
private final AafCredentials aafCredentials;
- GsonMessageRouter(@NotNull MessageRouterDmaapInfo dmaapInfo,
- @Nullable AafCredentials aafCredentials) {
+ GsonMessageRouter(String name, @NotNull MessageRouterDmaapInfo dmaapInfo,
+ @Nullable AafCredentials aafCredentials) {
+ this.name = name;
this.dmaapInfo = Objects.requireNonNull(dmaapInfo, "dmaapInfo");
this.aafCredentials = aafCredentials;
}
+ public String name() {
+ return name;
+ }
+
@Override
public String topicUrl() {
return dmaapInfo.topicUrl();
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java
index da218420..5eef48d9 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -30,8 +30,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
public class GsonMessageRouterSink extends GsonMessageRouter implements MessageRouterSink {
GsonMessageRouterSink(
- @NotNull MessageRouterDmaapInfo dmaapInfo,
+ String name, @NotNull MessageRouterDmaapInfo dmaapInfo,
@Nullable AafCredentials aafCredentials) {
- super(dmaapInfo, aafCredentials);
+ super(name, dmaapInfo, aafCredentials);
}
} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java
index b69c53db..d93a1d50 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -30,8 +30,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
public class GsonMessageRouterSource extends GsonMessageRouter implements MessageRouterSource {
GsonMessageRouterSource(
- @NotNull MessageRouterDmaapInfo dmaapInfo,
+ String name, @NotNull MessageRouterDmaapInfo dmaapInfo,
@Nullable AafCredentials aafCredentials) {
- super(dmaapInfo, aafCredentials);
+ super(name, dmaapInfo, aafCredentials);
}
} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java
index ced5ad55..0ce0f80e 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
import com.google.gson.annotations.SerializedName;
import org.immutables.gson.Gson;
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java
index 56f53932..1f518fe8 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java
@@ -17,20 +17,24 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-
public final class MessageRouterSinkParser implements StreamFromGsonParser<MessageRouterSink> {
private final Gson gson;
@@ -43,15 +47,16 @@ public final class MessageRouterSinkParser implements StreamFromGsonParser<Messa
this.gson = gson;
}
- public MessageRouterSink unsafeParse(JsonObject input) {
- assertStreamType(input, MESSAGE_ROUTER_TYPE);
+ @Override
+ public MessageRouterSink unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SINK);
- final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class);
+ final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class);
- final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+ final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class);
- return new GsonMessageRouterSink(dmaapInfo, aafCredentials);
+ return new GsonMessageRouterSink(input.name(), dmaapInfo, aafCredentials);
}
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java
index 25cf9e0e..c6c1b22c 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java
@@ -17,20 +17,24 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
-
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
*/
@@ -46,15 +50,16 @@ public final class MessageRouterSourceParser implements StreamFromGsonParser<Mes
this.gson = gson;
}
- public MessageRouterSource unsafeParse(JsonObject input) {
- assertStreamType(input, MESSAGE_ROUTER_TYPE);
+ @Override
+ public MessageRouterSource unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SOURCE);
- final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class);
+ final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class);
- final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+ final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class);
- return new GsonMessageRouterSource(dmaapInfo, aafCredentials);
+ return new GsonMessageRouterSource(input.name(), dmaapInfo, aafCredentials);
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java
index 2ad37a84..ad9b021e 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
@@ -32,15 +32,23 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
*/
abstract class GsonKafka implements Kafka {
- protected final KafkaInfo kafkaInfo;
+ private final String name;
+ final KafkaInfo kafkaInfo;
private final AafCredentials aafCredentials;
- GsonKafka(@NotNull KafkaInfo kafkaInfo,
+ GsonKafka(
+ @NotNull String name,
+ @NotNull KafkaInfo kafkaInfo,
@Nullable AafCredentials aafCredentials) {
+ this.name = Objects.requireNonNull(name, "name");
this.kafkaInfo = Objects.requireNonNull(kafkaInfo, "kafkaInfo");
this.aafCredentials = aafCredentials;
}
+ public String name() {
+ return name;
+ }
+
@Override
public String bootstrapServers() {
return kafkaInfo.bootstrapServers();
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java
index c45f8470..4990f80a 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -32,8 +32,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
class GsonKafkaSink extends GsonKafka implements KafkaSink {
GsonKafkaSink(
+ @NotNull String name,
@NotNull KafkaInfo kafkaInfo,
@Nullable AafCredentials aafCredentials) {
- super(kafkaInfo, aafCredentials);
+ super(name, kafkaInfo, aafCredentials);
}
+
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java
index 1509d9d7..137964c3 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -32,9 +32,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
class GsonKafkaSource extends GsonKafka implements KafkaSource {
GsonKafkaSource(
+ @NotNull String name,
@NotNull KafkaInfo kafkaInfo,
@Nullable AafCredentials aafCredentials) {
- super(kafkaInfo, aafCredentials);
+ super(name, kafkaInfo, aafCredentials);
}
@Override
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java
index 8b17a8d4..fd5602e6 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
import com.google.gson.annotations.SerializedName;
import org.immutables.gson.Gson;
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java
index f9a546c2..59373c45 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java
@@ -18,16 +18,21 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractAafCredentials;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractKafkaInfo;
import com.google.gson.Gson;
-import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_INFO_CHILD_NAME;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE;
/**
@@ -46,12 +51,13 @@ public final class KafkaSinkParser implements StreamFromGsonParser<KafkaSink> {
}
@Override
- public KafkaSink unsafeParse(JsonObject input) {
- assertStreamType(input, KAFKA_TYPE);
+ public KafkaSink unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SINK);
+ final JsonObject json = input.descriptor();
- final JsonElement kafkaInfoJson = requiredChild(input, KAFKA_INFO_CHILD_NAME);
- final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
+ final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json);
+ final AafCredentials aafCreds = extractAafCredentials(gson, json).getOrNull();
- return new GsonKafkaSink(kafkaInfo, null);
+ return new GsonKafkaSink(input.name(), kafkaInfo, aafCreds);
}
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java
index 08c02b47..6ac1dc99 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java
@@ -18,16 +18,21 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractAafCredentials;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractKafkaInfo;
import com.google.gson.Gson;
-import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_INFO_CHILD_NAME;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE;
/**
@@ -46,12 +51,13 @@ public final class KafkaSourceParser implements StreamFromGsonParser<KafkaSource
}
@Override
- public KafkaSource unsafeParse(JsonObject input) {
- assertStreamType(input, KAFKA_TYPE);
+ public KafkaSource unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SOURCE);
+ final JsonObject json = input.descriptor();
- final JsonElement kafkaInfoJson = requiredChild(input, KAFKA_INFO_CHILD_NAME);
- final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
+ final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json);
+ final AafCredentials aafCreds = extractAafCredentials(gson, json).getOrNull();
- return new GsonKafkaSource(kafkaInfo, null);
+ return new GsonKafkaSource(input.name(), kafkaInfo, aafCreds);
}
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java
new file mode 100644
index 00000000..4cfa33ac
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.optionalChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+final class KafkaUtils {
+
+ private KafkaUtils() {
+ }
+
+ static KafkaInfo extractKafkaInfo(Gson gson, JsonObject input) {
+ final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info");
+ return gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
+ }
+
+ static Option<AafCredentials> extractAafCredentials(Gson gson, JsonObject input) {
+ return optionalChild(input, "aaf_credentials")
+ .map(aafCredsJson -> gson.fromJson(aafCredsJson, ImmutableAafCredentials.class));
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java
index e8d63192..c3c70b78 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java
@@ -36,9 +36,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
@Gson.TypeAdapters
public interface AafCredentials {
- @SerializedName("aaf_username")
+ @SerializedName(value = "username", alternate = "aaf_username")
@Nullable String username();
- @SerializedName("aaf_password")
+ @SerializedName(value = "password", alternate = "aaf_password")
@Nullable String password();
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java
index 43d9d726..37bf7e57 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java
@@ -20,6 +20,7 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+import org.immutables.value.Value;
import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
/**
@@ -28,5 +29,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
*/
@ExperimentalApi
public interface DataStream {
-
+ @Value.Default
+ default String name() {
+ return "";
+ }
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java
new file mode 100644
index 00000000..f3cac547
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+public enum DataStreamDirection {
+
+ SINK("streams_publishes"),
+ SOURCE("streams_subscribes");
+
+ private final String configurationKey;
+
+ DataStreamDirection(String configurationKey) {
+ this.configurationKey = configurationKey;
+ }
+
+ public String configurationKey() {
+ return configurationKey;
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java
new file mode 100644
index 00000000..7a39ede5
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+
+import org.immutables.value.Value;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+@Value.Immutable
+public interface RawDataStream<T> {
+ String name();
+ String type();
+ DataStreamDirection direction();
+ T descriptor();
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java
index 97f07a29..1810fc6c 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java
@@ -20,6 +20,9 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+import static io.vavr.Predicates.not;
+
+import io.vavr.collection.List;
import org.immutables.value.Value;
import org.jetbrains.annotations.Nullable;
import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
@@ -46,4 +49,9 @@ public interface Kafka {
default int maxPayloadSizeBytes() {
return 1024 * 1024;
}
+
+ @Value.Derived
+ default List<String> bootstrapServerList() {
+ return List.of(bootstrapServers().split(",")).filter(not(String::isEmpty));
+ }
}