summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPiotr Wielebski <piotr.wielebski@nokia.com>2019-03-20 08:58:51 +0000
committerGerrit Code Review <gerrit@onap.org>2019-03-20 08:58:51 +0000
commit1903eacfadddd641b72c36f70df93b0475025c25 (patch)
tree5f9f7bfb389f9bd8b96fc3b15d8c41e591b69dc2
parent99fa950dabd3be4c0f0279d85ad56c896e07370f (diff)
parent565ec734f46a15bb3c87fe02a32613fc86c0eb22 (diff)
Merge "Kafka streams parsers - additions"
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java7
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java57
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java3
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java9
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java7
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java77
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java43
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java)24
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java)24
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java)12
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java)6
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java)6
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java)2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java)25
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java)25
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java)14
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java)6
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java)5
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java)2
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java)24
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java (renamed from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java)24
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java51
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java4
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java6
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java41
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java35
-rw-r--r--rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java8
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java8
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java98
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java2
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java2
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java3
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java60
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java (renamed from rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParserTest.java)58
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java (renamed from rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParserTest.java)56
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java (renamed from rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParserTest.java)29
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java (renamed from rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParserTest.java)27
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java (renamed from rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java)42
-rw-r--r--rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java120
-rw-r--r--rest-services/cbs-client/src/test/resources/sample_config.json32
-rw-r--r--rest-services/cbs-client/src/test/resources/streams/kafka_invalid_type.json (renamed from rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json)0
-rw-r--r--rest-services/cbs-client/src/test/resources/streams/kafka_sink.json4
-rw-r--r--rest-services/cbs-client/src/test/resources/streams/kafka_source.json14
46 files changed, 853 insertions, 257 deletions
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java
index 36589dad..989bd2db 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java
@@ -20,9 +20,9 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api;
import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsClientImpl;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsLookup;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import reactor.core.publisher.Mono;
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java
index 15c4eea2..dfd0e2f7 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java
@@ -19,17 +19,16 @@
*/
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
+import static java.lang.String.valueOf;
+
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.vavr.collection.List;
-import org.jetbrains.annotations.NotNull;
-
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-
-import static java.lang.String.valueOf;
+import org.jetbrains.annotations.NotNull;
/**
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java
new file mode 100644
index 00000000..4fdb31b1
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java
@@ -0,0 +1,57 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.vavr.collection.Stream;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+@ExperimentalApi
+public final class DataStreams {
+
+ private DataStreams() {
+ }
+
+ public static Stream<RawDataStream<JsonObject>> namedSources(JsonObject rootJson) {
+ return createCollectionOfStreams(rootJson, DataStreamDirection.SOURCE);
+ }
+
+ public static Stream<RawDataStream<JsonObject>> namedSinks(JsonObject rootJson) {
+ return createCollectionOfStreams(rootJson, DataStreamDirection.SINK);
+ }
+
+ private static Stream<RawDataStream<JsonObject>> createCollectionOfStreams(JsonObject rootJson, DataStreamDirection direction) {
+ final JsonElement streamsJson = rootJson.get(direction.configurationKey());
+ return streamsJson == null
+ ? Stream.empty()
+ : DataStreamUtils.mapJsonToStreams(streamsJson, direction);
+ }
+
+
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
index f18f2175..460d7100 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
@@ -21,10 +21,7 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
import com.google.gson.JsonObject;
-import io.vavr.control.Either;
import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
/**
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
index 9d703bb3..7ae92baf 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
@@ -20,12 +20,17 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.*;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr.DataRouterSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr.DataRouterSourceParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSourceParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since March 2019
+ * @since 1.1.4
*/
public final class StreamFromGsonParsers {
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
index 3467c809..69016ed8 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
@@ -26,6 +26,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
/**
* A generic data stream parser which parses {@code T} to data stream {@code S}.
@@ -33,7 +34,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Dat
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @param <T> input data type, eg. Gson Object
* @param <S> output data type
- * @since 1.1.3
+ * @since 1.1.4
*/
@ExperimentalApi
public interface StreamParser<T, S extends DataStream> {
@@ -44,7 +45,7 @@ public interface StreamParser<T, S extends DataStream> {
* @param input - the input data
* @return Right(parsing result) or Left(parsing error)
*/
- default Either<StreamParserError, S> parse(T input) {
+ default Either<StreamParserError, S> parse(RawDataStream<T> input) {
return Try.of(() -> unsafeParse(input))
.toEither()
.mapLeft(StreamParserError::fromThrowable);
@@ -58,7 +59,7 @@ public interface StreamParser<T, S extends DataStream> {
* @return parsing result
* @throws StreamParsingException when parsing was unsuccessful
*/
- default S unsafeParse(T input) {
+ default S unsafeParse(RawDataStream<T> input) {
return parse(input).getOrElseThrow(StreamParsingException::new);
}
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java
index 05bfc9be..9be08e3c 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java
@@ -24,9 +24,9 @@ import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import reactor.core.publisher.Mono;
public class CbsClientImpl implements CbsClient {
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java
index 53d0bd34..89daebc8 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java
@@ -22,12 +22,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
-
import java.net.InetSocketAddress;
-
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import reactor.core.publisher.Mono;
/**
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java
new file mode 100644
index 00000000..d34b1440
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.vavr.collection.Stream;
+import java.io.IOException;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+public final class DataStreamUtils {
+
+ public static Stream<RawDataStream<JsonObject>> mapJsonToStreams(JsonElement streamsJson,
+ DataStreamDirection direction) {
+ return Stream.ofAll(streamsJson.getAsJsonObject().entrySet())
+ .map(namedSinkJson -> {
+ final JsonObject jsonObject = namedSinkJson.getValue().getAsJsonObject();
+ return rawDataStream(namedSinkJson.getKey(), direction, jsonObject);
+ });
+ }
+
+ public static void assertStreamType(
+ RawDataStream<JsonObject> json,
+ String expectedType,
+ DataStreamDirection expectedDirection) {
+ if (!json.type().equals(expectedType)) {
+ throw new IllegalArgumentException(
+ "Invalid stream type. Expected '" + expectedType + "', but was '" + json.type() + "'");
+ }
+ if (json.direction() != expectedDirection) {
+ throw new IllegalArgumentException(
+ "Invalid stream direction. Expected '" + expectedDirection + "', but was '" + json.direction()
+ + "'");
+ }
+ }
+
+ public static RawDataStream<JsonObject> readSourceFromResource(String resource) throws IOException {
+ return rawDataStream(resource, DataStreamDirection.SOURCE, GsonUtils.readObjectFromResource(resource));
+ }
+
+ public static RawDataStream<JsonObject> readSinkFromResource(String resource) throws IOException {
+ return rawDataStream(resource, DataStreamDirection.SINK, GsonUtils.readObjectFromResource(resource));
+ }
+
+ private static RawDataStream<JsonObject> rawDataStream(String name, DataStreamDirection direction, JsonObject json) {
+ return ImmutableRawDataStream.<JsonObject>builder()
+ .name(name)
+ .direction(direction)
+ .type(GsonUtils.requiredString(json, "type"))
+ .descriptor(json)
+ .build();
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
index a0880165..0b662286 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
@@ -26,14 +26,14 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.vavr.Lazy;
-
+import io.vavr.control.Option;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Map.Entry;
import java.util.stream.Collectors;
-
-import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.GsonAdaptersMessageRouterDmaapInfo;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.GsonAdaptersKafkaInfo;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSource;
@@ -42,7 +42,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since March 2019
*/
-final class GsonUtils {
+public final class GsonUtils {
+
private static final Lazy<Gson> GSON = Lazy.of(() -> {
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersKafkaInfo());
@@ -56,39 +57,35 @@ final class GsonUtils {
private GsonUtils() {
}
- static Gson gsonInstance() {
+ public static Gson gsonInstance() {
return GSON.get();
}
- static void assertStreamType(JsonObject json, String expectedType) {
- final String actualType = requiredString(json, "type");
- if (!actualType.equals(expectedType)) {
- throw new IllegalArgumentException("Invalid stream type. Expected '" + expectedType + "', but was '" + actualType + "'");
- }
- }
-
- static String requiredString(JsonObject parent, String childName) {
+ public static String requiredString(JsonObject parent, String childName) {
return requiredChild(parent, childName).getAsString();
}
- static Option<String> optionalString(JsonObject parent, String childName) {
+ public static Option<String> optionalString(JsonObject parent, String childName) {
return Option.of(parent.get(childName).getAsString());
}
- static JsonElement requiredChild(JsonObject parent, String childName) {
- if (parent.has(childName)) {
- return parent.get(childName);
- } else {
- throw new IllegalArgumentException(
- "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + stringifyChildrenNames(parent));
- }
+ public static JsonElement requiredChild(JsonObject parent, String childName) {
+ return optionalChild(parent, childName)
+ .getOrElseThrow(() -> new IllegalArgumentException(
+ "Could not find sub-node '" + childName + "'. Actual sub-nodes: "
+ + stringifyChildrenNames(parent)));
+
+ }
+
+ public static Option<JsonElement> optionalChild(JsonObject parent, String childName) {
+ return Option.of(parent.get(childName));
}
- static JsonObject readObjectFromResource(String resource) throws IOException {
+ public static JsonObject readObjectFromResource(String resource) throws IOException {
return readFromResource(resource).getAsJsonObject();
}
- static JsonElement readFromResource(String resource) throws IOException {
+ public static JsonElement readFromResource(String resource) throws IOException {
try (Reader reader = new InputStreamReader(GsonUtils.class.getResourceAsStream(resource))) {
return new JsonParser().parse(reader);
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java
index 468b6180..42b86aee 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java
@@ -17,19 +17,23 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
*/
@@ -45,12 +49,12 @@ public final class DataRouterSinkParser implements StreamFromGsonParser<DataRout
this.gson = gson;
}
- public DataRouterSink unsafeParse(JsonObject input) {
- assertStreamType(input, DATA_ROUTER_TYPE);
-
- final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+ @Override
+ public DataRouterSink unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SINK);
- return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class);
+ final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
+ return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class).withName(input.name());
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java
index d78c3dde..7d29a90f 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java
@@ -17,19 +17,23 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSource;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSource;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
*/
@@ -45,12 +49,12 @@ public final class DataRouterSourceParser implements StreamFromGsonParser<DataRo
this.gson = gson;
}
- public DataRouterSource unsafeParse(JsonObject input) {
- assertStreamType(input, DATA_ROUTER_TYPE);
-
- final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+ @Override
+ public DataRouterSource unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SOURCE);
- return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class);
+ final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
+ return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class).withName(input.name());
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java
index 10c00e72..c5d254f0 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -31,15 +31,21 @@ import java.util.Objects;
*/
abstract class GsonMessageRouter implements MessageRouter {
+ private final String name;
private final MessageRouterDmaapInfo dmaapInfo;
private final AafCredentials aafCredentials;
- GsonMessageRouter(@NotNull MessageRouterDmaapInfo dmaapInfo,
- @Nullable AafCredentials aafCredentials) {
+ GsonMessageRouter(String name, @NotNull MessageRouterDmaapInfo dmaapInfo,
+ @Nullable AafCredentials aafCredentials) {
+ this.name = name;
this.dmaapInfo = Objects.requireNonNull(dmaapInfo, "dmaapInfo");
this.aafCredentials = aafCredentials;
}
+ public String name() {
+ return name;
+ }
+
@Override
public String topicUrl() {
return dmaapInfo.topicUrl();
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java
index da218420..5eef48d9 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -30,8 +30,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
public class GsonMessageRouterSink extends GsonMessageRouter implements MessageRouterSink {
GsonMessageRouterSink(
- @NotNull MessageRouterDmaapInfo dmaapInfo,
+ String name, @NotNull MessageRouterDmaapInfo dmaapInfo,
@Nullable AafCredentials aafCredentials) {
- super(dmaapInfo, aafCredentials);
+ super(name, dmaapInfo, aafCredentials);
}
} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java
index b69c53db..d93a1d50 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -30,8 +30,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
public class GsonMessageRouterSource extends GsonMessageRouter implements MessageRouterSource {
GsonMessageRouterSource(
- @NotNull MessageRouterDmaapInfo dmaapInfo,
+ String name, @NotNull MessageRouterDmaapInfo dmaapInfo,
@Nullable AafCredentials aafCredentials) {
- super(dmaapInfo, aafCredentials);
+ super(name, dmaapInfo, aafCredentials);
}
} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java
index ced5ad55..0ce0f80e 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
import com.google.gson.annotations.SerializedName;
import org.immutables.gson.Gson;
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java
index 56f53932..1f518fe8 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java
@@ -17,20 +17,24 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-
public final class MessageRouterSinkParser implements StreamFromGsonParser<MessageRouterSink> {
private final Gson gson;
@@ -43,15 +47,16 @@ public final class MessageRouterSinkParser implements StreamFromGsonParser<Messa
this.gson = gson;
}
- public MessageRouterSink unsafeParse(JsonObject input) {
- assertStreamType(input, MESSAGE_ROUTER_TYPE);
+ @Override
+ public MessageRouterSink unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SINK);
- final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class);
+ final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class);
- final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+ final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class);
- return new GsonMessageRouterSink(dmaapInfo, aafCredentials);
+ return new GsonMessageRouterSink(input.name(), dmaapInfo, aafCredentials);
}
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java
index 25cf9e0e..c6c1b22c 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java
@@ -17,20 +17,24 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
-
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
*/
@@ -46,15 +50,16 @@ public final class MessageRouterSourceParser implements StreamFromGsonParser<Mes
this.gson = gson;
}
- public MessageRouterSource unsafeParse(JsonObject input) {
- assertStreamType(input, MESSAGE_ROUTER_TYPE);
+ @Override
+ public MessageRouterSource unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SOURCE);
- final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class);
+ final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class);
- final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+ final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class);
- return new GsonMessageRouterSource(dmaapInfo, aafCredentials);
+ return new GsonMessageRouterSource(input.name(), dmaapInfo, aafCredentials);
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java
index 2ad37a84..ad9b021e 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
@@ -32,15 +32,23 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
*/
abstract class GsonKafka implements Kafka {
- protected final KafkaInfo kafkaInfo;
+ private final String name;
+ final KafkaInfo kafkaInfo;
private final AafCredentials aafCredentials;
- GsonKafka(@NotNull KafkaInfo kafkaInfo,
+ GsonKafka(
+ @NotNull String name,
+ @NotNull KafkaInfo kafkaInfo,
@Nullable AafCredentials aafCredentials) {
+ this.name = Objects.requireNonNull(name, "name");
this.kafkaInfo = Objects.requireNonNull(kafkaInfo, "kafkaInfo");
this.aafCredentials = aafCredentials;
}
+ public String name() {
+ return name;
+ }
+
@Override
public String bootstrapServers() {
return kafkaInfo.bootstrapServers();
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java
index c45f8470..4990f80a 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -32,8 +32,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
class GsonKafkaSink extends GsonKafka implements KafkaSink {
GsonKafkaSink(
+ @NotNull String name,
@NotNull KafkaInfo kafkaInfo,
@Nullable AafCredentials aafCredentials) {
- super(kafkaInfo, aafCredentials);
+ super(name, kafkaInfo, aafCredentials);
}
+
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java
index 1509d9d7..137964c3 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -32,9 +32,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
class GsonKafkaSource extends GsonKafka implements KafkaSource {
GsonKafkaSource(
+ @NotNull String name,
@NotNull KafkaInfo kafkaInfo,
@Nullable AafCredentials aafCredentials) {
- super(kafkaInfo, aafCredentials);
+ super(name, kafkaInfo, aafCredentials);
}
@Override
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java
index 8b17a8d4..fd5602e6 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
import com.google.gson.annotations.SerializedName;
import org.immutables.gson.Gson;
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java
index f9a546c2..59373c45 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java
@@ -18,16 +18,21 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractAafCredentials;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractKafkaInfo;
import com.google.gson.Gson;
-import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_INFO_CHILD_NAME;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE;
/**
@@ -46,12 +51,13 @@ public final class KafkaSinkParser implements StreamFromGsonParser<KafkaSink> {
}
@Override
- public KafkaSink unsafeParse(JsonObject input) {
- assertStreamType(input, KAFKA_TYPE);
+ public KafkaSink unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SINK);
+ final JsonObject json = input.descriptor();
- final JsonElement kafkaInfoJson = requiredChild(input, KAFKA_INFO_CHILD_NAME);
- final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
+ final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json);
+ final AafCredentials aafCreds = extractAafCredentials(gson, json).getOrNull();
- return new GsonKafkaSink(kafkaInfo, null);
+ return new GsonKafkaSink(input.name(), kafkaInfo, aafCreds);
}
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java
index 08c02b47..6ac1dc99 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java
@@ -18,16 +18,21 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractAafCredentials;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractKafkaInfo;
import com.google.gson.Gson;
-import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_INFO_CHILD_NAME;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE;
/**
@@ -46,12 +51,13 @@ public final class KafkaSourceParser implements StreamFromGsonParser<KafkaSource
}
@Override
- public KafkaSource unsafeParse(JsonObject input) {
- assertStreamType(input, KAFKA_TYPE);
+ public KafkaSource unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SOURCE);
+ final JsonObject json = input.descriptor();
- final JsonElement kafkaInfoJson = requiredChild(input, KAFKA_INFO_CHILD_NAME);
- final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
+ final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json);
+ final AafCredentials aafCreds = extractAafCredentials(gson, json).getOrNull();
- return new GsonKafkaSource(kafkaInfo, null);
+ return new GsonKafkaSource(input.name(), kafkaInfo, aafCreds);
}
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java
new file mode 100644
index 00000000..4cfa33ac
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.optionalChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+final class KafkaUtils {
+
+ private KafkaUtils() {
+ }
+
+ static KafkaInfo extractKafkaInfo(Gson gson, JsonObject input) {
+ final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info");
+ return gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
+ }
+
+ static Option<AafCredentials> extractAafCredentials(Gson gson, JsonObject input) {
+ return optionalChild(input, "aaf_credentials")
+ .map(aafCredsJson -> gson.fromJson(aafCredsJson, ImmutableAafCredentials.class));
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java
index e8d63192..c3c70b78 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java
@@ -36,9 +36,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
@Gson.TypeAdapters
public interface AafCredentials {
- @SerializedName("aaf_username")
+ @SerializedName(value = "username", alternate = "aaf_username")
@Nullable String username();
- @SerializedName("aaf_password")
+ @SerializedName(value = "password", alternate = "aaf_password")
@Nullable String password();
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java
index 43d9d726..37bf7e57 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java
@@ -20,6 +20,7 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+import org.immutables.value.Value;
import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
/**
@@ -28,5 +29,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
*/
@ExperimentalApi
public interface DataStream {
-
+ @Value.Default
+ default String name() {
+ return "";
+ }
}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java
new file mode 100644
index 00000000..f3cac547
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+public enum DataStreamDirection {
+
+ SINK("streams_publishes"),
+ SOURCE("streams_subscribes");
+
+ private final String configurationKey;
+
+ DataStreamDirection(String configurationKey) {
+ this.configurationKey = configurationKey;
+ }
+
+ public String configurationKey() {
+ return configurationKey;
+ }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java
new file mode 100644
index 00000000..7a39ede5
--- /dev/null
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+
+import org.immutables.value.Value;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+@Value.Immutable
+public interface RawDataStream<T> {
+ String name();
+ String type();
+ DataStreamDirection direction();
+ T descriptor();
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java
index 97f07a29..1810fc6c 100644
--- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java
+++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java
@@ -20,6 +20,9 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+import static io.vavr.Predicates.not;
+
+import io.vavr.collection.List;
import org.immutables.value.Value;
import org.jetbrains.annotations.Nullable;
import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
@@ -46,4 +49,9 @@ public interface Kafka {
default int maxPayloadSizeBytes() {
return 1024 * 1024;
}
+
+ @Value.Derived
+ default List<String> bootstrapServerList() {
+ return List.of(bootstrapServers().split(",")).filter(not(String::isEmpty));
+ }
}
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java
index c9ceeaf1..8a5edcc9 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java
@@ -20,17 +20,15 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
+import static org.assertj.core.api.Assertions.assertThat;
+
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.vavr.collection.List;
+import java.math.BigInteger;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
-import java.math.BigInteger;
-
-
-import static org.assertj.core.api.Assertions.assertThat;
-
class MerkleTreeParserTest {
private final MerkleTreeParser cut = new MerkleTreeParser();
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
index e862d849..e2833fe5 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
@@ -20,20 +20,29 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendResource;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendString;
import com.google.gson.JsonObject;
+import io.vavr.collection.Map;
import io.vavr.collection.Stream;
import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -128,6 +137,91 @@ class CbsClientImplIT {
.verify(Duration.ofSeconds(5));
}
+ @Test
+ void testCbsClientWithStreamsParsing() {
+ // given
+ final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+ final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
+ final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+
+ // when
+ final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+ .map(json ->
+ DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head()
+ );
+
+ // then
+ StepVerifier.create(result)
+ .consumeNextWith(kafkaSink -> {
+ assertThat(kafkaSink.name()).isEqualTo("perf3gpp");
+ assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060");
+ assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP");
+ })
+ .expectComplete()
+ .verify(Duration.ofSeconds(5));
+ }
+
+ @Test
+ void testCbsClientWithStreamsParsingUsingSwitch() {
+ // given
+ final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+ final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+ // TODO: Use these parsers below
+ final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
+ final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();
+
+ // when
+ final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+ .map(json -> {
+ final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json)
+ .groupBy(RawDataStream::type);
+
+ final Stream<KafkaSink> allKafkaSinks = sinks.getOrElse("kafka", Stream.empty())
+ .map(kafkaSinkParser::unsafeParse);
+ final Stream<MessageRouterSink> allMrSinks = sinks.getOrElse("message_router", Stream.empty())
+ .map(mrSinkParser::unsafeParse);
+
+ assertThat(allKafkaSinks.size())
+ .describedAs("Number of kafka sinks")
+ .isEqualTo(2);
+ assertThat(allMrSinks.size())
+ .describedAs("Number of DMAAP-MR sinks")
+ .isEqualTo(1);
+
+ return true;
+ })
+ .then();
+
+ // then
+ StepVerifier.create(result)
+ .expectComplete()
+ .verify(Duration.ofSeconds(5));
+ }
+
+ @Test
+ void testCbsClientWithStreamsParsingWhenUsingInvalidParser() {
+ // given
+ final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+ final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser();
+ final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+
+ // when
+ final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+ .map(json ->
+ DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head()
+ );
+
+ // then
+ StepVerifier.create(result)
+ .expectErrorSatisfies(ex -> {
+ assertThat(ex).isInstanceOf(IllegalArgumentException.class);
+ assertThat(ex).hasMessageContaining("Invalid stream type");
+ assertThat(ex).hasMessageContaining("message_router");
+ assertThat(ex).hasMessageContaining("kafka");
+ })
+ .verify(Duration.ofSeconds(5));
+ }
+
private String sampleConfigValue(JsonObject obj) {
return obj.get(SAMPLE_CONFIG_KEY).getAsString();
}
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java
index 9fd7cc88..617904f9 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java
@@ -30,8 +30,8 @@ import static org.mockito.Mockito.verify;
import com.google.gson.JsonObject;
import java.net.InetSocketAddress;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import reactor.core.publisher.Mono;
/**
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java
index 6843e0e3..94ff53f9 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java
@@ -31,9 +31,9 @@ import com.google.gson.JsonParser;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java
index d0485f57..7835a5f9 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java
@@ -21,9 +21,6 @@
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
import io.vavr.CheckedFunction0;
-import io.vavr.Function0;
-import java.io.IOException;
-import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java
deleted file mode 100644
index 87131285..00000000
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import com.google.gson.JsonObject;
-import java.io.IOException;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
-
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since March 2019
- */
-class KafkaSourceParserTest {
-
- private final StreamFromGsonParser<KafkaSource> cut = StreamFromGsonParsers.kafkaSourceParser();
-
- @Test
- void precondition_assureInstanceOf() {
- assertThat(cut).isInstanceOf(KafkaSourceParser.class);
- }
-
- @Test
- void shouldParseMinimalKafkaSourceDefinition() throws IOException {
- // given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_source_minimal.json");
-
- // when
- final KafkaSource result = cut.unsafeParse(input);
-
- // then
- assertThat(result.aafCredentials()).isNull();
- assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
- assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
- assertThat(result.clientId()).isNull();
- assertThat(result.clientRole()).isNull();
- }
-} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java
index 398ebcd9..7092de5a 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
@@ -26,6 +26,10 @@ import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink;
@@ -37,6 +41,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea
class DataRouterSinkParserTest {
+
private static final String SAMPLE_LOCATION = "mtc00";
private static final String SAMPLE_PUBLISH_URL = "https://we-are-data-router.us/feed/xyz";
private static final String SAMPLE_LOG_URL = "https://we-are-data-router.us/feed/xyz/logs";
@@ -44,49 +49,54 @@ class DataRouterSinkParserTest {
private static final String SAMPLE_PASSWORD = "some-password";
private static final String SAMPLE_PUBLISHER_ID = "123456";
- private static final Gson gson = new Gson();
-
private final StreamFromGsonParser<DataRouterSink> streamParser = StreamFromGsonParsers.dataRouterSinkParser();
- private static final DataRouterSink fullConfigurationStream = ImmutableDataRouterSink.builder()
- .location(SAMPLE_LOCATION)
- .publishUrl(SAMPLE_PUBLISH_URL)
- .logUrl(SAMPLE_LOG_URL)
- .username(SAMPLE_USER)
- .password(SAMPLE_PASSWORD)
- .publisherId(SAMPLE_PUBLISHER_ID)
- .build();
-
- private static final DataRouterSink minimalConfigurationStream = ImmutableDataRouterSink.builder()
- .publishUrl(SAMPLE_PUBLISH_URL)
- .build();
-
@Test
void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/data_router_sink_full.json");
+
// when
DataRouterSink result = streamParser.unsafeParse(input);
+
// then
+ final DataRouterSink fullConfigurationStream = ImmutableDataRouterSink.builder()
+ .name(input.name())
+ .location(SAMPLE_LOCATION)
+ .publishUrl(SAMPLE_PUBLISH_URL)
+ .logUrl(SAMPLE_LOG_URL)
+ .username(SAMPLE_USER)
+ .password(SAMPLE_PASSWORD)
+ .publisherId(SAMPLE_PUBLISHER_ID)
+ .build();
assertThat(result).isEqualTo(fullConfigurationStream);
}
@Test
void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
//given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_minimal.json");
+ RawDataStream<JsonObject> input = DataStreamUtils
+ .readSinkFromResource("/streams/data_router_sink_minimal.json");
+
// when
DataRouterSink result = streamParser.unsafeParse(input);
+
// then
+ final DataRouterSink minimalConfigurationStream = ImmutableDataRouterSink.builder()
+ .name(input.name())
+ .publishUrl(SAMPLE_PUBLISH_URL)
+ .build();
assertThat(result).isEqualTo(minimalConfigurationStream);
}
@Test
void incorrectConfiguration_shouldParseToStreamParserError() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_full.json");
+
// when
Either<StreamParserError, DataRouterSink> result = streamParser.parse(input);
+
// then
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
@@ -100,9 +110,17 @@ class DataRouterSinkParserTest {
@Test
void emptyConfiguration_shouldParseToStreamParserError() {
// given
- JsonObject input = gson.fromJson("{}", JsonObject.class);
+ JsonObject json = new JsonObject();
+ final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
+ .name("empty")
+ .type("data_router")
+ .descriptor(json)
+ .direction(DataStreamDirection.SINK)
+ .build();
+
// when
Either<StreamParserError, DataRouterSink> result = streamParser.parse(input);
+
// then
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
}
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java
index 681fa147..b2d01309 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -28,7 +28,11 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.St
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
import java.io.IOException;
@@ -38,54 +42,60 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
public class DataRouterSourceParserTest {
+
private static final String SAMPLE_LOCATION = "mtc00";
private static final String SAMPLE_DELIVERY_URL = "https://my-subscriber-app.dcae:8080/target-path";
private static final String SAMPLE_USER = "some-user";
private static final String SAMPLE_PASSWORD = "some-password";
private static final String SAMPLE_SUBSCRIBER_ID = "789012";
- private static final Gson gson = new Gson();
-
- private static final DataRouterSource fullConfigurationStream = ImmutableDataRouterSource.builder()
- .location(SAMPLE_LOCATION)
- .deliveryUrl(SAMPLE_DELIVERY_URL)
- .username(SAMPLE_USER)
- .password(SAMPLE_PASSWORD)
- .subscriberId(SAMPLE_SUBSCRIBER_ID)
- .build();
-
- private static final DataRouterSource minimalConfigurationStream = ImmutableDataRouterSource.builder()
- .build();
-
-
private final StreamFromGsonParser<DataRouterSource> streamParser = StreamFromGsonParsers.dataRouterSourceParser();
@Test
void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_source_full.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/data_router_source_full.json");
+
// when
DataRouterSource result = streamParser.unsafeParse(input);
+
// then
+
+ final DataRouterSource fullConfigurationStream = ImmutableDataRouterSource.builder()
+ .name(input.name())
+ .location(SAMPLE_LOCATION)
+ .deliveryUrl(SAMPLE_DELIVERY_URL)
+ .username(SAMPLE_USER)
+ .password(SAMPLE_PASSWORD)
+ .subscriberId(SAMPLE_SUBSCRIBER_ID)
+ .build();
assertThat(result).isEqualTo(fullConfigurationStream);
}
@Test
void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_source_minimal.json");
+ RawDataStream<JsonObject> input = DataStreamUtils
+ .readSourceFromResource("/streams/data_router_source_minimal.json");
+
// when
DataRouterSource result = streamParser.unsafeParse(input);
+
// then
+ final DataRouterSource minimalConfigurationStream = ImmutableDataRouterSource.builder()
+ .name(input.name())
+ .build();
assertThat(result).isEqualTo(minimalConfigurationStream);
}
@Test
void incorrectConfiguration_shouldParseToStreamParserError() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_full.json");
+
// when
Either<StreamParserError, DataRouterSource> result = streamParser.parse(input);
+
// then
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
@@ -99,9 +109,17 @@ public class DataRouterSourceParserTest {
@Test
void emptyConfiguration_shouldBeParsedToStreamParserError() {
// given
- JsonObject input = gson.fromJson("{}", JsonObject.class);
+ JsonObject json = new JsonObject();
+ final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
+ .name("empty")
+ .type("data_router")
+ .descriptor(json)
+ .direction(DataStreamDirection.SOURCE)
+ .build();
+
// when
Either<StreamParserError, DataRouterSource> result = streamParser.parse(input);
+
// then
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
}
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java
index 63b04d1d..4d3b88b8 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
@@ -26,6 +26,10 @@ import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
@@ -48,16 +52,16 @@ public class MessageRouterSinkParserTest {
private static final String SAMPLE_CLIENT_ID = "1500462518108";
private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic";
- private static final Gson gson = new Gson();
-
private final StreamFromGsonParser<MessageRouterSink> streamParser = StreamFromGsonParsers.messageRouterSinkParser();
@Test
void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_full.json");
+
// when
MessageRouterSink result = streamParser.unsafeParse(input);
+
// then
assertThat(result).isInstanceOf(MessageRouterSink.class);
assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME);
@@ -71,10 +75,11 @@ public class MessageRouterSinkParserTest {
@Test
void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_minimal.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_minimal.json");
// when
MessageRouterSink result = streamParser.unsafeParse(input);
+
// then
assertThat(result).isInstanceOf(MessageRouterSink.class);
assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
@@ -86,9 +91,11 @@ public class MessageRouterSinkParserTest {
@Test
void incorrectConfiguration_shouldParseToStreamParserError() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/data_router_sink_full.json");
+
// when
Either<StreamParserError, MessageRouterSink> result = streamParser.parse(input);
+
// then
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
@@ -102,9 +109,17 @@ public class MessageRouterSinkParserTest {
@Test
void emptyConfiguration_shouldParseToStreamParserError() {
// given
- JsonObject input = gson.fromJson("{}", JsonObject.class);
+ JsonObject json = new JsonObject();
+ final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
+ .name("empty")
+ .type("data_router")
+ .descriptor(json)
+ .direction(DataStreamDirection.SINK)
+ .build();
+
// when
Either<StreamParserError, MessageRouterSink> result = streamParser.parse(input);
+
// then
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
}
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java
index fea63d66..d497817f 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
@@ -26,6 +26,10 @@ import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
@@ -48,16 +52,16 @@ public class MessageRouterSourceParserTest {
private static final String SAMPLE_CLIENT_ID = "1500462518108";
private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic";
- private static final Gson gson = new Gson();
-
private final StreamFromGsonParser<MessageRouterSource> streamParser = StreamFromGsonParsers.messageRouterSourceParser();
@Test
void fullConfiguration_shouldGenerateDataRouterSourceObject() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_full.json");
+
// when
MessageRouterSource result = streamParser.unsafeParse(input);
+
// then
assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME);
assertThat(result.aafCredentials().password()).isEqualTo(SAMPLE_AAF_PASSWORD);
@@ -70,10 +74,11 @@ public class MessageRouterSourceParserTest {
@Test
void minimalConfiguration_shouldGenerateDataRouterSourceObject() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_minimal.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_minimal.json");
// when
MessageRouterSource result = streamParser.unsafeParse(input);
+
// then
assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
assertThat(result.aafCredentials().username()).isNull();
@@ -84,9 +89,11 @@ public class MessageRouterSourceParserTest {
@Test
void incorrectConfiguration_shouldParseToStreamParserError() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/data_router_sink_full.json");
+
// when
Either<StreamParserError, MessageRouterSource> result = streamParser.parse(input);
+
// then
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
@@ -100,7 +107,13 @@ public class MessageRouterSourceParserTest {
@Test
void emptyConfiguration_shouldParseToStreamParserError() {
// given
- JsonObject input = gson.fromJson("{}", JsonObject.class);
+ JsonObject json = new JsonObject();
+ final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
+ .name("empty")
+ .type("data_router")
+ .descriptor(json)
+ .direction(DataStreamDirection.SOURCE)
+ .build();
// when
Either<StreamParserError, MessageRouterSource> result = streamParser.parse(input);
// then
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java
index b5481203..5974639c 100644
--- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java
@@ -18,22 +18,21 @@
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.in;
-import static org.junit.jupiter.api.Assertions.*;
import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import io.vavr.Function1;
import io.vavr.control.Either;
import java.io.IOException;
-import java.io.InputStreamReader;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
/**
@@ -52,7 +51,7 @@ class KafkaSinkParserTest {
@Test
void shouldParseMinimalKafkaSinkDefinition() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_minimal.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink_minimal.json");
// when
final KafkaSink result = cut.unsafeParse(input);
@@ -66,15 +65,19 @@ class KafkaSinkParserTest {
}
@Test
- void shouldParseBasicKafkaSinkDefinition() throws IOException {
+ void shouldParseFullKafkaSinkDefinition() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink.json");
// when
final KafkaSink result = cut.unsafeParse(input);
// then
- assertThat(result.aafCredentials()).isNull();
+ final ImmutableAafCredentials expectedCredentials = ImmutableAafCredentials.builder()
+ .username("the user")
+ .password("the passwd")
+ .build();
+ assertThat(result.aafCredentials()).isEqualTo(expectedCredentials);
assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
assertThat(result.clientId()).isEqualTo("1500462518108");
@@ -84,7 +87,7 @@ class KafkaSinkParserTest {
@Test
void shouldReturnErrorWhenStructureIsWrong() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_missing_child.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink_missing_child.json");
// when
final Either<StreamParserError, KafkaSink> result = cut.parse(input);
@@ -99,7 +102,7 @@ class KafkaSinkParserTest {
@Test
void shouldReturnErrorWhenTypeIsWrong() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_invalid_type.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_invalid_type.json");
// when
final Either<StreamParserError, KafkaSink> result = cut.parse(input);
@@ -112,4 +115,19 @@ class KafkaSinkParserTest {
assertThat(error.message()).containsIgnoringCase("message_router");
});
}
+
+ @Test
+ void shouldReturnErrorWhenDirectionIsWrong() throws IOException {
+ // given
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_sink.json");
+
+ // when
+ final Either<StreamParserError, KafkaSink> result = cut.parse(input);
+
+ // then
+ assertThat(result.isRight()).describedAs("should not be right").isFalse();
+ result.peekLeft(error -> {
+ assertThat(error.message()).containsIgnoringCase("invalid stream direction");
+ });
+ }
} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java
new file mode 100644
index 00000000..d255d99a
--- /dev/null
+++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java
@@ -0,0 +1,120 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.gson.JsonObject;
+import io.vavr.collection.List;
+import io.vavr.control.Either;
+import java.io.IOException;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class KafkaSourceParserTest {
+
+ private final StreamFromGsonParser<KafkaSource> cut = StreamFromGsonParsers.kafkaSourceParser();
+
+ @Test
+ void precondition_assureInstanceOf() {
+ assertThat(cut).isInstanceOf(KafkaSourceParser.class);
+ }
+
+ @Test
+ void shouldParseMinimalKafkaSourceDefinition() throws IOException {
+ // given
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_source_minimal.json");
+
+ // when
+ final KafkaSource result = cut.unsafeParse(input);
+
+ // then
+ assertThat(result.aafCredentials()).isNull();
+ assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
+ assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
+ assertThat(result.clientId()).isNull();
+ assertThat(result.clientRole()).isNull();
+ }
+
+ @Test
+ void shouldParseFullKafkaSourceDefinition() throws IOException {
+ // given
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_source.json");
+
+ // when
+ final KafkaSource result = cut.unsafeParse(input);
+
+ // then
+ final ImmutableAafCredentials expectedCredentials = ImmutableAafCredentials.builder()
+ .username("the user")
+ .password("the passwd")
+ .build();
+ assertThat(result.aafCredentials()).isEqualTo(expectedCredentials);
+ assertThat(result.bootstrapServerList()).isEqualTo(List.of("dmaap-mr-kafka-0:6060", "dmaap-mr-kafka-1:6060"));
+ assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
+ assertThat(result.consumerGroupId()).isEqualTo("nokia-perf3gpp-processor");
+ assertThat(result.clientId()).isEqualTo("1500462518108");
+ assertThat(result.clientRole()).isEqualTo("com.dcae.member");
+ }
+
+ @Test
+ void shouldReturnErrorWhenTypeIsWrong() throws IOException {
+ // given
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_invalid_type.json");
+
+ // when
+ final Either<StreamParserError, KafkaSource> result = cut.parse(input);
+
+ // then
+ assertThat(result.isRight()).describedAs("should not be right").isFalse();
+ result.peekLeft(error -> {
+ assertThat(error.message()).containsIgnoringCase("invalid stream type");
+ assertThat(error.message()).containsIgnoringCase("kafka");
+ assertThat(error.message()).containsIgnoringCase("message_router");
+ });
+ }
+
+ @Test
+ void shouldReturnErrorWhenDirectionIsWrong() throws IOException {
+ // given
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_source.json");
+
+ // when
+ final Either<StreamParserError, KafkaSource> result = cut.parse(input);
+
+ // then
+ assertThat(result.isRight()).describedAs("should not be right").isFalse();
+ result.peekLeft(error -> {
+ assertThat(error.message()).containsIgnoringCase("invalid stream direction");
+ });
+ }
+} \ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/resources/sample_config.json b/rest-services/cbs-client/src/test/resources/sample_config.json
index a95b723f..266326f4 100644
--- a/rest-services/cbs-client/src/test/resources/sample_config.json
+++ b/rest-services/cbs-client/src/test/resources/sample_config.json
@@ -1,3 +1,33 @@
{
- "keystore.path": "/var/run/security/keystore.p12"
+ "keystore.path": "/var/run/security/keystore.p12",
+ "streams_publishes": {
+ "perf3gpp": {
+ "type": "kafka",
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka:6060",
+ "topic_name": "HVVES_PERF3GPP"
+ }
+ },
+ "pnf_ready": {
+ "type": "message_router",
+ "dmaap_info": {
+ "topic_url": "http://message-router:3904/events/VES_PNF_READY"
+ }
+ },
+ "call_trace": {
+ "type": "kafka",
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka:6060",
+ "topic_name": "HVVES_TRACE"
+ }
+ }
+ },
+ "streams_subscribes": {
+ "measurements": {
+ "type": "message_router",
+ "dmaap_info": {
+ "topic_url": "http://message-router:3904/events/VES_MEASUREMENT"
+ }
+ }
+ }
}
diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json b/rest-services/cbs-client/src/test/resources/streams/kafka_invalid_type.json
index 0ee88adb..0ee88adb 100644
--- a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json
+++ b/rest-services/cbs-client/src/test/resources/streams/kafka_invalid_type.json
diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json
index b60388d5..e7b45508 100644
--- a/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json
+++ b/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json
@@ -1,5 +1,9 @@
{
"type": "kafka",
+ "aaf_credentials": {
+ "username": "the user",
+ "password": "the passwd"
+ },
"kafka_info": {
"client_role": "com.dcae.member",
"client_id": "1500462518108",
diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_source.json b/rest-services/cbs-client/src/test/resources/streams/kafka_source.json
new file mode 100644
index 00000000..379dbef1
--- /dev/null
+++ b/rest-services/cbs-client/src/test/resources/streams/kafka_source.json
@@ -0,0 +1,14 @@
+{
+ "type": "kafka",
+ "aaf_credentials": {
+ "username": "the user",
+ "password": "the passwd"
+ },
+ "kafka_info": {
+ "client_role": "com.dcae.member",
+ "client_id": "1500462518108",
+ "bootstrap_servers": "dmaap-mr-kafka-0:6060,,dmaap-mr-kafka-1:6060,",
+ "topic_name": "HVVES_PERF3GPP",
+ "consumer_group_id": "nokia-perf3gpp-processor"
+ }
+}