summaryrefslogtreecommitdiffstats
path: root/rest-services/cbs-client/src
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-03-14 08:38:15 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-03-14 13:58:20 +0100
commitc852448b1e6ca5e28e0b0dab26c2af3d5af3f390 (patch)
treebf50c6ddbb24eab14f4da96849a74c6088f1e338 /rest-services/cbs-client/src
parent9eb05b612d185d00ad6ad9deffc5a3ab5cf91a1e (diff)
Implement Kafka stream definition parsers
Change-Id: I43215c1c2494b6036deb004891fb76bfd2b74474 Issue-ID: DCAEGEN2-1341 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'rest-services/cbs-client/src')
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java9
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java37
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java44
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java34
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java92
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java39
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java45
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java89
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java56
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java58
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java57
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java49
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java35
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java37
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java115
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java60
-rw-r--r--rest-services/cbs-client/src/test/resources/streams/kafka_sink.json9
-rw-r--r--rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json7
-rw-r--r--rest-services/cbs-client/src/test/resources/streams/kafka_sink_minimal.json7
-rw-r--r--rest-services/cbs-client/src/test/resources/streams/kafka_sink_missing_child.json3
-rw-r--r--rest-services/cbs-client/src/test/resources/streams/kafka_source_minimal.json7
24 files changed, 889 insertions, 8 deletions
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java
index 214f0e4a..cbdea005 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java
@@ -20,6 +20,7 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions;
+import io.vavr.control.Either;
import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
/**
@@ -30,6 +31,14 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
public class StreamParserError {
private final String message;
+ public static <R> Either<StreamParserError, R> left(Throwable ex) {
+ return Either.left(fromThrowable(ex));
+ }
+
+ public static StreamParserError fromThrowable(Throwable ex) {
+ return new StreamParserError(ex.getMessage());
+ }
+
public StreamParserError(String message) {
this.message = message;
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
new file mode 100644
index 00000000..f18f2175
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import com.google.gson.JsonObject;
+import io.vavr.control.Either;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+@ExperimentalApi
+public interface StreamFromGsonParser<S extends DataStream> extends StreamParser<JsonObject, S> {
+
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
new file mode 100644
index 00000000..4b0223fd
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.KafkaSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.KafkaSourceParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+public final class StreamFromGsonParsers {
+
+ private StreamFromGsonParsers() {
+ }
+
+ public static StreamFromGsonParser<KafkaSink> kafkaSinkParser() {
+ return KafkaSinkParser.create();
+ }
+
+ public static StreamFromGsonParser<KafkaSource> kafkaSourceParser() {
+ return KafkaSourceParser.create();
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
index 9ba7047a..3467c809 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
@@ -21,22 +21,44 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
import io.vavr.control.Either;
+import io.vavr.control.Try;
import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.MerkleTree;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
/**
+ * A generic data stream parser which parses {@code T} to data stream {@code S}.
+ *
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.2
+ * @param <T> input data type, eg. Gson Object
+ * @param <S> output data type
+ * @since 1.1.3
*/
@ExperimentalApi
-public interface StreamParser<S extends DataStream> {
+public interface StreamParser<T, S extends DataStream> {
- Either<StreamParserError, S> parse(MerkleTree<String> subtree);
+ /**
+ * Parse the input data {@code T} producing the {@link DataStream}.
+ *
+ * @param input - the input data
+ * @return Right(parsing result) or Left(parsing error)
+ */
+ default Either<StreamParserError, S> parse(T input) {
+ return Try.of(() -> unsafeParse(input))
+ .toEither()
+ .mapLeft(StreamParserError::fromThrowable);
+ }
- default S unsafeParse(MerkleTree<String> subtree) {
- return parse(subtree).getOrElseThrow(StreamParsingException::new);
+ /**
+ * Parse the input data {@code T} producing the {@link DataStream}. Will throw StreamParsingException when input
+ * was invalid.
+ *
+ * @param input - the input data
+ * @return parsing result
+ * @throws StreamParsingException when parsing was unsuccessful
+ */
+ default S unsafeParse(T input) {
+ return parse(input).getOrElseThrow(StreamParsingException::new);
}
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java
new file mode 100644
index 00000000..ecafd30c
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java
@@ -0,0 +1,92 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import java.util.Objects;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+abstract class GsonKafka implements Kafka {
+
+ protected final KafkaInfo kafkaInfo;
+ private final AafCredentials aafCredentials;
+
+ GsonKafka(@NotNull KafkaInfo kafkaInfo,
+ @Nullable AafCredentials aafCredentials) {
+ this.kafkaInfo = Objects.requireNonNull(kafkaInfo, "kafkaInfo");
+ this.aafCredentials = aafCredentials;
+ }
+
+ @Override
+ public String bootstrapServers() {
+ return kafkaInfo.bootstrapServers();
+ }
+
+ @Override
+ public String topicName() {
+ return kafkaInfo.topicName();
+ }
+
+ @Override
+ public @Nullable AafCredentials aafCredentials() {
+ return aafCredentials;
+ }
+
+ @Override
+ public @Nullable String clientRole() {
+ return kafkaInfo.clientRole();
+ }
+
+ @Override
+ public @Nullable String clientId() {
+ return kafkaInfo.clientId();
+ }
+
+ @Override
+ public int maxPayloadSizeBytes() {
+ return kafkaInfo.maxPayloadSizeBytes();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GsonKafka gsonKafka = (GsonKafka) o;
+ return kafkaInfo.equals(gsonKafka.kafkaInfo) &&
+ Objects.equals(aafCredentials, gsonKafka.aafCredentials);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(kafkaInfo, aafCredentials);
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java
new file mode 100644
index 00000000..c45f8470
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class GsonKafkaSink extends GsonKafka implements KafkaSink {
+
+ GsonKafkaSink(
+ @NotNull KafkaInfo kafkaInfo,
+ @Nullable AafCredentials aafCredentials) {
+ super(kafkaInfo, aafCredentials);
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java
new file mode 100644
index 00000000..1509d9d7
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java
@@ -0,0 +1,45 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class GsonKafkaSource extends GsonKafka implements KafkaSource {
+
+ GsonKafkaSource(
+ @NotNull KafkaInfo kafkaInfo,
+ @Nullable AafCredentials aafCredentials) {
+ super(kafkaInfo, aafCredentials);
+ }
+
+ @Override
+ public @Nullable String consumerGroupId() {
+ return kafkaInfo.consumerGroupId();
+ }
+
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
new file mode 100644
index 00000000..a813607e
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
@@ -0,0 +1,89 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.vavr.Lazy;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+final class GsonUtils {
+ private static final Lazy<Gson> GSON = Lazy.of(() -> {
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersKafkaInfo());
+ gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersAafCredentials());
+ return gsonBuilder.create();
+ });
+
+ private GsonUtils() {
+ }
+
+ static Gson gsonInstance() {
+ return GSON.get();
+ }
+
+ static void assertStreamType(JsonObject json, String expectedType) {
+ final String actualType = requiredString(json, "type");
+ if (!actualType.equals(expectedType)) {
+ throw new IllegalArgumentException("Invalid stream type. Expected '" + expectedType + "', but was '" + actualType + "'");
+ }
+ }
+
+ static String requiredString(JsonObject parent, String childName) {
+ return requiredChild(parent, childName).getAsString();
+ }
+
+ static JsonElement requiredChild(JsonObject parent, String childName) {
+ if (parent.has(childName)) {
+ return parent.get(childName);
+ } else {
+ throw new IllegalArgumentException(
+ "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + stringifyChildrenNames(parent));
+ }
+ }
+
+ static JsonObject readObjectFromResource(String resource) throws IOException {
+ return readFromResource(resource).getAsJsonObject();
+ }
+
+ static JsonElement readFromResource(String resource) throws IOException {
+ try (Reader reader = new InputStreamReader(GsonUtils.class.getResourceAsStream(resource))) {
+ return new JsonParser().parse(reader);
+ }
+ }
+
+ private static String stringifyChildrenNames(JsonObject parent) {
+ return parent.entrySet().stream().map(Entry::getKey).collect(Collectors.joining(", "));
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java
new file mode 100644
index 00000000..8b17a8d4
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.annotations.SerializedName;
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+@Value.Immutable
+@Gson.TypeAdapters
+public interface KafkaInfo {
+
+ @SerializedName("bootstrap_servers")
+ String bootstrapServers();
+
+ @SerializedName("topic_name")
+ String topicName();
+
+ @SerializedName("consumer_group_id")
+ @Nullable String consumerGroupId();
+
+ @SerializedName("client_role")
+ @Nullable String clientRole();
+
+ @SerializedName("client_id")
+ @Nullable String clientId();
+
+ @Value.Default
+ @SerializedName("max_payload_size_bytes")
+ default int maxPayloadSizeBytes() {
+ return 1024 * 1024;
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java
new file mode 100644
index 00000000..393fe40f
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+public class KafkaSinkParser implements StreamFromGsonParser<KafkaSink> {
+
+ private final Gson gson;
+
+ public static KafkaSinkParser create() {
+ return new KafkaSinkParser(gsonInstance());
+ }
+
+ KafkaSinkParser(Gson gson) {
+ this.gson = gson;
+ }
+
+ @Override
+ public KafkaSink unsafeParse(JsonObject input) {
+ assertStreamType(input, "kafka");
+
+ final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info");
+ final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
+
+ return new GsonKafkaSink(kafkaInfo, null);
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java
new file mode 100644
index 00000000..8b48a2a4
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java
@@ -0,0 +1,57 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+public class KafkaSourceParser implements StreamFromGsonParser<KafkaSource> {
+ private final Gson gson;
+
+ public static KafkaSourceParser create() {
+ return new KafkaSourceParser(gsonInstance());
+ }
+
+ KafkaSourceParser(Gson gson) {
+ this.gson = gson;
+ }
+
+ @Override
+ public KafkaSource unsafeParse(JsonObject input) {
+ assertStreamType(input, "kafka");
+
+ final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info");
+ final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
+
+ return new GsonKafkaSource(kafkaInfo, null);
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java
index 11481561..ecb0b553 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java
@@ -21,6 +21,8 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
import org.jetbrains.annotations.Nullable;
import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
@@ -29,6 +31,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
* @version 1.2.1
*/
@ExperimentalApi
+@Value.Immutable
+@Gson.TypeAdapters
public interface AafCredentials {
@Nullable String username();
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java
index 3ccce215..e3389207 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java
@@ -29,6 +29,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
* @version 1.2.1
*/
@ExperimentalApi
-public interface SinkStream {
+public interface SinkStream extends DataStream {
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java
index 78bb5b51..2bea143b 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java
@@ -29,6 +29,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
* @version 1.2.1
*/
@ExperimentalApi
-public interface SourceStream {
+public interface SourceStream extends DataStream {
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java
new file mode 100644
index 00000000..97f07a29
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java
@@ -0,0 +1,49 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+
+import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+@ExperimentalApi
+public interface Kafka {
+
+ String bootstrapServers();
+
+ String topicName();
+
+ @Nullable AafCredentials aafCredentials();
+
+ @Nullable String clientRole();
+
+ @Nullable String clientId();
+
+ @Value.Default
+ default int maxPayloadSizeBytes() {
+ return 1024 * 1024;
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java
new file mode 100644
index 00000000..514881fe
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+
+import org.immutables.value.Value;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+@ExperimentalApi
+@Value.Immutable
+public abstract interface KafkaSink extends Kafka, SinkStream {
+
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java
new file mode 100644
index 00000000..65280a98
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+
+import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+@ExperimentalApi
+@Value.Immutable
+public interface KafkaSource extends Kafka, SourceStream {
+
+ @Nullable String consumerGroupId();
+}
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java
new file mode 100644
index 00000000..b5481203
--- /dev/null
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java
@@ -0,0 +1,115 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.in;
+import static org.junit.jupiter.api.Assertions.*;
+
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.vavr.Function1;
+import io.vavr.control.Either;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class KafkaSinkParserTest {
+
+ private final StreamFromGsonParser<KafkaSink> cut = StreamFromGsonParsers.kafkaSinkParser();
+
+ @Test
+ void precondition_assureInstanceOf() {
+ assertThat(cut).isInstanceOf(KafkaSinkParser.class);
+ }
+
+ @Test
+ void shouldParseMinimalKafkaSinkDefinition() throws IOException {
+ // given
+ JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_minimal.json");
+
+ // when
+ final KafkaSink result = cut.unsafeParse(input);
+
+ // then
+ assertThat(result.aafCredentials()).isNull();
+ assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
+ assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
+ assertThat(result.clientId()).isNull();
+ assertThat(result.clientRole()).isNull();
+ }
+
+ @Test
+ void shouldParseBasicKafkaSinkDefinition() throws IOException {
+ // given
+ JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink.json");
+
+ // when
+ final KafkaSink result = cut.unsafeParse(input);
+
+ // then
+ assertThat(result.aafCredentials()).isNull();
+ assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
+ assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
+ assertThat(result.clientId()).isEqualTo("1500462518108");
+ assertThat(result.clientRole()).isEqualTo("com.dcae.member");
+ }
+
+ @Test
+ void shouldReturnErrorWhenStructureIsWrong() throws IOException {
+ // given
+ JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_missing_child.json");
+
+ // when
+ final Either<StreamParserError, KafkaSink> result = cut.parse(input);
+
+ // then
+ assertThat(result.isRight()).describedAs("should not be right").isFalse();
+ result.peekLeft(error -> {
+ assertThat(error.message()).contains("kafka_info");
+ });
+ }
+
+ @Test
+ void shouldReturnErrorWhenTypeIsWrong() throws IOException {
+ // given
+ JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_invalid_type.json");
+
+ // when
+ final Either<StreamParserError, KafkaSink> result = cut.parse(input);
+
+ // then
+ assertThat(result.isRight()).describedAs("should not be right").isFalse();
+ result.peekLeft(error -> {
+ assertThat(error.message()).containsIgnoringCase("invalid stream type");
+ assertThat(error.message()).containsIgnoringCase("kafka");
+ assertThat(error.message()).containsIgnoringCase("message_router");
+ });
+ }
+} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java
new file mode 100644
index 00000000..87131285
--- /dev/null
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java
@@ -0,0 +1,60 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class KafkaSourceParserTest {
+
+ private final StreamFromGsonParser<KafkaSource> cut = StreamFromGsonParsers.kafkaSourceParser();
+
+ @Test
+ void precondition_assureInstanceOf() {
+ assertThat(cut).isInstanceOf(KafkaSourceParser.class);
+ }
+
+ @Test
+ void shouldParseMinimalKafkaSourceDefinition() throws IOException {
+ // given
+ JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_source_minimal.json");
+
+ // when
+ final KafkaSource result = cut.unsafeParse(input);
+
+ // then
+ assertThat(result.aafCredentials()).isNull();
+ assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
+ assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
+ assertThat(result.clientId()).isNull();
+ assertThat(result.clientRole()).isNull();
+ }
+} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json
new file mode 100644
index 00000000..b60388d5
--- /dev/null
+++ b/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json
@@ -0,0 +1,9 @@
+{
+ "type": "kafka",
+ "kafka_info": {
+ "client_role": "com.dcae.member",
+ "client_id": "1500462518108",
+ "bootstrap_servers": "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060",
+ "topic_name": "HVVES_PERF3GPP"
+ }
+}
diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json
new file mode 100644
index 00000000..0ee88adb
--- /dev/null
+++ b/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json
@@ -0,0 +1,7 @@
+{
+ "type": "message_router",
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060",
+ "topic_name": "HVVES_PERF3GPP"
+ }
+}
diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_minimal.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink_minimal.json
new file mode 100644
index 00000000..da8dd4fe
--- /dev/null
+++ b/rest-services/cbs-client/src/test/resources/streams/kafka_sink_minimal.json
@@ -0,0 +1,7 @@
+{
+ "type": "kafka",
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060",
+ "topic_name": "HVVES_PERF3GPP"
+ }
+}
diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_missing_child.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink_missing_child.json
new file mode 100644
index 00000000..d2591b3a
--- /dev/null
+++ b/rest-services/cbs-client/src/test/resources/streams/kafka_sink_missing_child.json
@@ -0,0 +1,3 @@
+{
+ "type": "kafka"
+}
diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_source_minimal.json b/rest-services/cbs-client/src/test/resources/streams/kafka_source_minimal.json
new file mode 100644
index 00000000..da8dd4fe
--- /dev/null
+++ b/rest-services/cbs-client/src/test/resources/streams/kafka_source_minimal.json
@@ -0,0 +1,7 @@
+{
+ "type": "kafka",
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060",
+ "topic_name": "HVVES_PERF3GPP"
+ }
+}