From 565ec734f46a15bb3c87fe02a32613fc86c0eb22 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Thu, 14 Mar 2019 14:34:35 +0100 Subject: Kafka streams parsers - additions Change-Id: I98ca661682b41d76d3de668d6faeb6ebe02f92a8 Issue-ID: DCAEGEN2-1341 Signed-off-by: Piotr Jaszczyk --- .../services/cbs/client/api/CbsClientFactory.java | 2 +- .../cbs/client/api/listener/MerkleTreeParser.java | 7 +- .../cbs/client/api/streams/DataStreams.java | 57 +++++++++ .../client/api/streams/StreamFromGsonParser.java | 3 - .../client/api/streams/StreamFromGsonParsers.java | 9 +- .../cbs/client/api/streams/StreamParser.java | 7 +- .../services/cbs/client/impl/CbsClientImpl.java | 2 +- .../rest/services/cbs/client/impl/CbsLookup.java | 4 +- .../impl/streams/gson/DataRouterSinkParser.java | 57 --------- .../impl/streams/gson/DataRouterSourceParser.java | 57 --------- .../client/impl/streams/gson/DataStreamUtils.java | 77 ++++++++++++ .../cbs/client/impl/streams/gson/GsonKafka.java | 91 -------------- .../client/impl/streams/gson/GsonKafkaSink.java | 39 ------ .../client/impl/streams/gson/GsonKafkaSource.java | 45 ------- .../impl/streams/gson/GsonMessageRouter.java | 67 ----------- .../impl/streams/gson/GsonMessageRouterSink.java | 37 ------ .../impl/streams/gson/GsonMessageRouterSource.java | 37 ------ .../cbs/client/impl/streams/gson/GsonUtils.java | 43 ++++--- .../cbs/client/impl/streams/gson/KafkaInfo.java | 56 --------- .../client/impl/streams/gson/KafkaSinkParser.java | 57 --------- .../impl/streams/gson/KafkaSourceParser.java | 57 --------- .../impl/streams/gson/MessageRouterDmaapInfo.java | 42 ------- .../impl/streams/gson/MessageRouterSinkParser.java | 57 --------- .../streams/gson/MessageRouterSourceParser.java | 61 ---------- .../gson/dmaap/dr/DataRouterSinkParser.java | 61 ++++++++++ .../gson/dmaap/dr/DataRouterSourceParser.java | 61 ++++++++++ .../streams/gson/dmaap/mr/GsonMessageRouter.java | 73 +++++++++++ .../gson/dmaap/mr/GsonMessageRouterSink.java | 37 ++++++ .../gson/dmaap/mr/GsonMessageRouterSource.java | 37 ++++++ .../gson/dmaap/mr/MessageRouterDmaapInfo.java | 42 +++++++ .../gson/dmaap/mr/MessageRouterSinkParser.java | 62 ++++++++++ .../gson/dmaap/mr/MessageRouterSourceParser.java | 66 ++++++++++ .../client/impl/streams/gson/kafka/GsonKafka.java | 99 +++++++++++++++ .../impl/streams/gson/kafka/GsonKafkaSink.java | 41 +++++++ .../impl/streams/gson/kafka/GsonKafkaSource.java | 46 +++++++ .../client/impl/streams/gson/kafka/KafkaInfo.java | 56 +++++++++ .../impl/streams/gson/kafka/KafkaSinkParser.java | 63 ++++++++++ .../impl/streams/gson/kafka/KafkaSourceParser.java | 63 ++++++++++ .../client/impl/streams/gson/kafka/KafkaUtils.java | 51 ++++++++ .../cbs/client/model/streams/AafCredentials.java | 4 +- .../cbs/client/model/streams/DataStream.java | 6 +- .../client/model/streams/DataStreamDirection.java | 41 +++++++ .../cbs/client/model/streams/RawDataStream.java | 35 ++++++ .../cbs/client/model/streams/dmaap/Kafka.java | 8 ++ .../client/api/listener/MerkleTreeParserTest.java | 8 +- .../services/cbs/client/impl/CbsClientImplIT.java | 98 ++++++++++++++- .../cbs/client/impl/CbsClientImplTest.java | 2 +- .../services/cbs/client/impl/CbsLookupTest.java | 2 +- .../services/cbs/client/impl/DummyHttpServer.java | 3 - .../streams/gson/DataRouterSinkParserTest.java | 110 ----------------- .../streams/gson/DataRouterSourceParserTest.java | 108 ----------------- .../impl/streams/gson/KafkaSinkParserTest.java | 115 ------------------ .../impl/streams/gson/KafkaSourceParserTest.java | 60 ---------- .../streams/gson/MessageRouterSinkParserTest.java | 113 ----------------- .../gson/MessageRouterSourceParserTest.java | 111 ----------------- .../gson/dmaap/dr/DataRouterSinkParserTest.java | 128 ++++++++++++++++++++ .../gson/dmaap/dr/DataRouterSourceParserTest.java | 126 +++++++++++++++++++ .../gson/dmaap/mr/MessageRouterSinkParserTest.java | 128 ++++++++++++++++++++ .../dmaap/mr/MessageRouterSourceParserTest.java | 124 +++++++++++++++++++ .../streams/gson/kafka/KafkaSinkParserTest.java | 133 +++++++++++++++++++++ .../streams/gson/kafka/KafkaSourceParserTest.java | 120 +++++++++++++++++++ .../src/test/resources/sample_config.json | 32 ++++- .../test/resources/streams/kafka_invalid_type.json | 7 ++ .../src/test/resources/streams/kafka_sink.json | 4 + .../resources/streams/kafka_sink_invalid_type.json | 7 -- .../src/test/resources/streams/kafka_source.json | 14 +++ 66 files changed, 2036 insertions(+), 1440 deletions(-) create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java delete mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java delete mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java delete mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java delete mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java delete mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java delete mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java delete mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java delete mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java delete mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java delete mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java delete mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java delete mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java delete mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java delete mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java delete mode 100644 rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParserTest.java delete mode 100644 rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParserTest.java delete mode 100644 rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java delete mode 100644 rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java delete mode 100644 rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParserTest.java delete mode 100644 rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParserTest.java create mode 100644 rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java create mode 100644 rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java create mode 100644 rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java create mode 100644 rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java create mode 100644 rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java create mode 100644 rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java create mode 100644 rest-services/cbs-client/src/test/resources/streams/kafka_invalid_type.json delete mode 100644 rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json create mode 100644 rest-services/cbs-client/src/test/resources/streams/kafka_source.json diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java index 36589dad..989bd2db 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java @@ -20,9 +20,9 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsClientImpl; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsLookup; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; import reactor.core.publisher.Mono; diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java index 15c4eea2..dfd0e2f7 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java @@ -19,17 +19,16 @@ */ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; +import static java.lang.String.valueOf; + import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.vavr.collection.List; -import org.jetbrains.annotations.NotNull; - import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; - -import static java.lang.String.valueOf; +import org.jetbrains.annotations.NotNull; /** diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java new file mode 100644 index 00000000..4fdb31b1 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java @@ -0,0 +1,57 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.vavr.collection.Stream; +import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; + +/** + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +@ExperimentalApi +public final class DataStreams { + + private DataStreams() { + } + + public static Stream> namedSources(JsonObject rootJson) { + return createCollectionOfStreams(rootJson, DataStreamDirection.SOURCE); + } + + public static Stream> namedSinks(JsonObject rootJson) { + return createCollectionOfStreams(rootJson, DataStreamDirection.SINK); + } + + private static Stream> createCollectionOfStreams(JsonObject rootJson, DataStreamDirection direction) { + final JsonElement streamsJson = rootJson.get(direction.configurationKey()); + return streamsJson == null + ? Stream.empty() + : DataStreamUtils.mapJsonToStreams(streamsJson, direction); + } + + +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java index f18f2175..460d7100 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java @@ -21,10 +21,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; import com.google.gson.JsonObject; -import io.vavr.control.Either; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; /** diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java index 9d703bb3..7ae92baf 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java @@ -20,12 +20,17 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.*; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr.DataRouterSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr.DataRouterSourceParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSourceParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*; /** * @author Piotr Jaszczyk - * @since March 2019 + * @since 1.1.4 */ public final class StreamFromGsonParsers { diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java index 3467c809..69016ed8 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java @@ -26,6 +26,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; /** * A generic data stream parser which parses {@code T} to data stream {@code S}. @@ -33,7 +34,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Dat * @author Piotr Jaszczyk * @param input data type, eg. Gson Object * @param output data type - * @since 1.1.3 + * @since 1.1.4 */ @ExperimentalApi public interface StreamParser { @@ -44,7 +45,7 @@ public interface StreamParser { * @param input - the input data * @return Right(parsing result) or Left(parsing error) */ - default Either parse(T input) { + default Either parse(RawDataStream input) { return Try.of(() -> unsafeParse(input)) .toEither() .mapLeft(StreamParserError::fromThrowable); @@ -58,7 +59,7 @@ public interface StreamParser { * @return parsing result * @throws StreamParsingException when parsing was unsuccessful */ - default S unsafeParse(T input) { + default S unsafeParse(RawDataStream input) { return parse(input).getOrElseThrow(StreamParsingException::new); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java index 05bfc9be..9be08e3c 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java @@ -24,9 +24,9 @@ import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URL; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import reactor.core.publisher.Mono; public class CbsClientImpl implements CbsClient { diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java index 53d0bd34..89daebc8 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java @@ -22,12 +22,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; import com.google.gson.JsonArray; import com.google.gson.JsonObject; - import java.net.InetSocketAddress; - +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import reactor.core.publisher.Mono; /** diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java deleted file mode 100644 index 468b6180..00000000 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink; - -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; - -/** - * @author Kornel Janiak - */ - -public final class DataRouterSinkParser implements StreamFromGsonParser { - private final Gson gson; - - public static DataRouterSinkParser create() { - return new DataRouterSinkParser(gsonInstance()); - } - - private DataRouterSinkParser(Gson gson) { - this.gson = gson; - } - - public DataRouterSink unsafeParse(JsonObject input) { - assertStreamType(input, DATA_ROUTER_TYPE); - - final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME); - - return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class); - - } - -} \ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java deleted file mode 100644 index d78c3dde..00000000 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSource; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSource; - -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; - -/** - * @author Kornel Janiak - */ - -public final class DataRouterSourceParser implements StreamFromGsonParser { - private final Gson gson; - - public static DataRouterSourceParser create() { - return new DataRouterSourceParser(gsonInstance()); - } - - private DataRouterSourceParser(Gson gson) { - this.gson = gson; - } - - public DataRouterSource unsafeParse(JsonObject input) { - assertStreamType(input, DATA_ROUTER_TYPE); - - final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME); - - return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class); - - } - -} \ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java new file mode 100644 index 00000000..d34b1440 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.vavr.collection.Stream; +import java.io.IOException; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +public final class DataStreamUtils { + + public static Stream> mapJsonToStreams(JsonElement streamsJson, + DataStreamDirection direction) { + return Stream.ofAll(streamsJson.getAsJsonObject().entrySet()) + .map(namedSinkJson -> { + final JsonObject jsonObject = namedSinkJson.getValue().getAsJsonObject(); + return rawDataStream(namedSinkJson.getKey(), direction, jsonObject); + }); + } + + public static void assertStreamType( + RawDataStream json, + String expectedType, + DataStreamDirection expectedDirection) { + if (!json.type().equals(expectedType)) { + throw new IllegalArgumentException( + "Invalid stream type. Expected '" + expectedType + "', but was '" + json.type() + "'"); + } + if (json.direction() != expectedDirection) { + throw new IllegalArgumentException( + "Invalid stream direction. Expected '" + expectedDirection + "', but was '" + json.direction() + + "'"); + } + } + + public static RawDataStream readSourceFromResource(String resource) throws IOException { + return rawDataStream(resource, DataStreamDirection.SOURCE, GsonUtils.readObjectFromResource(resource)); + } + + public static RawDataStream readSinkFromResource(String resource) throws IOException { + return rawDataStream(resource, DataStreamDirection.SINK, GsonUtils.readObjectFromResource(resource)); + } + + private static RawDataStream rawDataStream(String name, DataStreamDirection direction, JsonObject json) { + return ImmutableRawDataStream.builder() + .name(name) + .direction(direction) + .type(GsonUtils.requiredString(json, "type")) + .descriptor(json) + .build(); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java deleted file mode 100644 index 2ad37a84..00000000 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ - -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import java.util.Objects; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka; - -/** - * @author Piotr Jaszczyk - * @since March 2019 - */ -abstract class GsonKafka implements Kafka { - - protected final KafkaInfo kafkaInfo; - private final AafCredentials aafCredentials; - - GsonKafka(@NotNull KafkaInfo kafkaInfo, - @Nullable AafCredentials aafCredentials) { - this.kafkaInfo = Objects.requireNonNull(kafkaInfo, "kafkaInfo"); - this.aafCredentials = aafCredentials; - } - - @Override - public String bootstrapServers() { - return kafkaInfo.bootstrapServers(); - } - - @Override - public String topicName() { - return kafkaInfo.topicName(); - } - - @Override - public @Nullable AafCredentials aafCredentials() { - return aafCredentials; - } - - @Override - public @Nullable String clientRole() { - return kafkaInfo.clientRole(); - } - - @Override - public @Nullable String clientId() { - return kafkaInfo.clientId(); - } - - @Override - public int maxPayloadSizeBytes() { - return kafkaInfo.maxPayloadSizeBytes(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - GsonKafka gsonKafka = (GsonKafka) o; - return kafkaInfo.equals(gsonKafka.kafkaInfo) && - Objects.equals(aafCredentials, gsonKafka.aafCredentials); - } - - @Override - public int hashCode() { - return Objects.hash(kafkaInfo, aafCredentials); - } -} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java deleted file mode 100644 index c45f8470..00000000 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ - -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; - -/** - * @author Piotr Jaszczyk - * @since March 2019 - */ -class GsonKafkaSink extends GsonKafka implements KafkaSink { - - GsonKafkaSink( - @NotNull KafkaInfo kafkaInfo, - @Nullable AafCredentials aafCredentials) { - super(kafkaInfo, aafCredentials); - } -} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java deleted file mode 100644 index 1509d9d7..00000000 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ - -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; - -/** - * @author Piotr Jaszczyk - * @since March 2019 - */ -class GsonKafkaSource extends GsonKafka implements KafkaSource { - - GsonKafkaSource( - @NotNull KafkaInfo kafkaInfo, - @Nullable AafCredentials aafCredentials) { - super(kafkaInfo, aafCredentials); - } - - @Override - public @Nullable String consumerGroupId() { - return kafkaInfo.consumerGroupId(); - } - -} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java deleted file mode 100644 index 10c00e72..00000000 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouter; - -import java.util.Objects; - -/** - * @author Kornel Janiak - */ - -abstract class GsonMessageRouter implements MessageRouter { - private final MessageRouterDmaapInfo dmaapInfo; - private final AafCredentials aafCredentials; - - GsonMessageRouter(@NotNull MessageRouterDmaapInfo dmaapInfo, - @Nullable AafCredentials aafCredentials) { - this.dmaapInfo = Objects.requireNonNull(dmaapInfo, "dmaapInfo"); - this.aafCredentials = aafCredentials; - } - - @Override - public String topicUrl() { - return dmaapInfo.topicUrl(); - } - - @Override - public @Nullable String clientRole() { - return dmaapInfo.clientRole(); - } - - @Override - public @Nullable String clientId() { - return dmaapInfo.clientId(); - } - - @Override - public @Nullable String location() { - return dmaapInfo.location(); - } - - @Override - public @Nullable AafCredentials aafCredentials() { - return aafCredentials; - } -} \ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java deleted file mode 100644 index da218420..00000000 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; - -/** - * @author Kornel Janiak - */ - -public class GsonMessageRouterSink extends GsonMessageRouter implements MessageRouterSink { - GsonMessageRouterSink( - @NotNull MessageRouterDmaapInfo dmaapInfo, - @Nullable AafCredentials aafCredentials) { - super(dmaapInfo, aafCredentials); - } -} \ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java deleted file mode 100644 index b69c53db..00000000 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource; - -/** - * @author Kornel Janiak - */ - -public class GsonMessageRouterSource extends GsonMessageRouter implements MessageRouterSource { - GsonMessageRouterSource( - @NotNull MessageRouterDmaapInfo dmaapInfo, - @Nullable AafCredentials aafCredentials) { - super(dmaapInfo, aafCredentials); - } -} \ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java index a0880165..0b662286 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java @@ -26,14 +26,14 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import io.vavr.Lazy; - +import io.vavr.control.Option; import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; import java.util.Map.Entry; import java.util.stream.Collectors; - -import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.GsonAdaptersMessageRouterDmaapInfo; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.GsonAdaptersKafkaInfo; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSource; @@ -42,7 +42,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma * @author Piotr Jaszczyk * @since March 2019 */ -final class GsonUtils { +public final class GsonUtils { + private static final Lazy GSON = Lazy.of(() -> { GsonBuilder gsonBuilder = new GsonBuilder(); gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersKafkaInfo()); @@ -56,39 +57,35 @@ final class GsonUtils { private GsonUtils() { } - static Gson gsonInstance() { + public static Gson gsonInstance() { return GSON.get(); } - static void assertStreamType(JsonObject json, String expectedType) { - final String actualType = requiredString(json, "type"); - if (!actualType.equals(expectedType)) { - throw new IllegalArgumentException("Invalid stream type. Expected '" + expectedType + "', but was '" + actualType + "'"); - } - } - - static String requiredString(JsonObject parent, String childName) { + public static String requiredString(JsonObject parent, String childName) { return requiredChild(parent, childName).getAsString(); } - static Option optionalString(JsonObject parent, String childName) { + public static Option optionalString(JsonObject parent, String childName) { return Option.of(parent.get(childName).getAsString()); } - static JsonElement requiredChild(JsonObject parent, String childName) { - if (parent.has(childName)) { - return parent.get(childName); - } else { - throw new IllegalArgumentException( - "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + stringifyChildrenNames(parent)); - } + public static JsonElement requiredChild(JsonObject parent, String childName) { + return optionalChild(parent, childName) + .getOrElseThrow(() -> new IllegalArgumentException( + "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + + stringifyChildrenNames(parent))); + + } + + public static Option optionalChild(JsonObject parent, String childName) { + return Option.of(parent.get(childName)); } - static JsonObject readObjectFromResource(String resource) throws IOException { + public static JsonObject readObjectFromResource(String resource) throws IOException { return readFromResource(resource).getAsJsonObject(); } - static JsonElement readFromResource(String resource) throws IOException { + public static JsonElement readFromResource(String resource) throws IOException { try (Reader reader = new InputStreamReader(GsonUtils.class.getResourceAsStream(resource))) { return new JsonParser().parse(reader); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java deleted file mode 100644 index 8b17a8d4..00000000 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ - -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import com.google.gson.annotations.SerializedName; -import org.immutables.gson.Gson; -import org.immutables.value.Value; -import org.jetbrains.annotations.Nullable; - -/** - * @author Piotr Jaszczyk - * @since March 2019 - */ -@Value.Immutable -@Gson.TypeAdapters -public interface KafkaInfo { - - @SerializedName("bootstrap_servers") - String bootstrapServers(); - - @SerializedName("topic_name") - String topicName(); - - @SerializedName("consumer_group_id") - @Nullable String consumerGroupId(); - - @SerializedName("client_role") - @Nullable String clientRole(); - - @SerializedName("client_id") - @Nullable String clientId(); - - @Value.Default - @SerializedName("max_payload_size_bytes") - default int maxPayloadSizeBytes() { - return 1024 * 1024; - } -} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java deleted file mode 100644 index f9a546c2..00000000 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ - -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; - -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_INFO_CHILD_NAME; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE; - -/** - * @author Piotr Jaszczyk - * @since 1.1.4 - */ -public final class KafkaSinkParser implements StreamFromGsonParser { - private final Gson gson; - - public static KafkaSinkParser create() { - return new KafkaSinkParser(gsonInstance()); - } - - private KafkaSinkParser(Gson gson) { - this.gson = gson; - } - - @Override - public KafkaSink unsafeParse(JsonObject input) { - assertStreamType(input, KAFKA_TYPE); - - final JsonElement kafkaInfoJson = requiredChild(input, KAFKA_INFO_CHILD_NAME); - final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class); - - return new GsonKafkaSink(kafkaInfo, null); - } -} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java deleted file mode 100644 index 08c02b47..00000000 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ - -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; - -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_INFO_CHILD_NAME; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE; - -/** - * @author Piotr Jaszczyk - * @since 1.1.4 - */ -public final class KafkaSourceParser implements StreamFromGsonParser { - private final Gson gson; - - public static KafkaSourceParser create() { - return new KafkaSourceParser(gsonInstance()); - } - - private KafkaSourceParser(Gson gson) { - this.gson = gson; - } - - @Override - public KafkaSource unsafeParse(JsonObject input) { - assertStreamType(input, KAFKA_TYPE); - - final JsonElement kafkaInfoJson = requiredChild(input, KAFKA_INFO_CHILD_NAME); - final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class); - - return new GsonKafkaSource(kafkaInfo, null); - } -} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java deleted file mode 100644 index ced5ad55..00000000 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import com.google.gson.annotations.SerializedName; -import org.immutables.gson.Gson; -import org.immutables.value.Value; -import org.jetbrains.annotations.Nullable; - -@Gson.TypeAdapters -@Value.Immutable -public interface MessageRouterDmaapInfo { - - @SerializedName("topic_url") - String topicUrl(); - - @SerializedName("client_role") - @Nullable String clientRole(); - - @SerializedName("client_id") - @Nullable String clientId(); - - @SerializedName("location") - @Nullable String location(); -} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java deleted file mode 100644 index 56f53932..00000000 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; - -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; - -public final class MessageRouterSinkParser implements StreamFromGsonParser { - - private final Gson gson; - - public static MessageRouterSinkParser create() { - return new MessageRouterSinkParser(gsonInstance()); - } - - private MessageRouterSinkParser(Gson gson) { - this.gson = gson; - } - - public MessageRouterSink unsafeParse(JsonObject input) { - assertStreamType(input, MESSAGE_ROUTER_TYPE); - - final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class); - - final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME); - final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class); - - return new GsonMessageRouterSink(dmaapInfo, aafCredentials); - - } -} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java deleted file mode 100644 index 25cf9e0e..00000000 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource; - -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; - -/** - * @author Kornel Janiak - */ - -public final class MessageRouterSourceParser implements StreamFromGsonParser { - private final Gson gson; - - public static MessageRouterSourceParser create() { - return new MessageRouterSourceParser(gsonInstance()); - } - - private MessageRouterSourceParser(Gson gson) { - this.gson = gson; - } - - public MessageRouterSource unsafeParse(JsonObject input) { - assertStreamType(input, MESSAGE_ROUTER_TYPE); - - final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class); - - final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME); - final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class); - - return new GsonMessageRouterSource(dmaapInfo, aafCredentials); - - } - -} \ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java new file mode 100644 index 00000000..42b86aee --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java @@ -0,0 +1,61 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink; + +/** + * @author Kornel Janiak + */ + +public final class DataRouterSinkParser implements StreamFromGsonParser { + private final Gson gson; + + public static DataRouterSinkParser create() { + return new DataRouterSinkParser(gsonInstance()); + } + + private DataRouterSinkParser(Gson gson) { + this.gson = gson; + } + + @Override + public DataRouterSink unsafeParse(RawDataStream input) { + assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SINK); + + final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME); + return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class).withName(input.name()); + + } + +} \ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java new file mode 100644 index 00000000..7d29a90f --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java @@ -0,0 +1,61 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSource; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSource; + +/** + * @author Kornel Janiak + */ + +public final class DataRouterSourceParser implements StreamFromGsonParser { + private final Gson gson; + + public static DataRouterSourceParser create() { + return new DataRouterSourceParser(gsonInstance()); + } + + private DataRouterSourceParser(Gson gson) { + this.gson = gson; + } + + @Override + public DataRouterSource unsafeParse(RawDataStream input) { + assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SOURCE); + + final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME); + return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class).withName(input.name()); + + } + +} \ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java new file mode 100644 index 00000000..c5d254f0 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java @@ -0,0 +1,73 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouter; + +import java.util.Objects; + +/** + * @author Kornel Janiak + */ + +abstract class GsonMessageRouter implements MessageRouter { + private final String name; + private final MessageRouterDmaapInfo dmaapInfo; + private final AafCredentials aafCredentials; + + GsonMessageRouter(String name, @NotNull MessageRouterDmaapInfo dmaapInfo, + @Nullable AafCredentials aafCredentials) { + this.name = name; + this.dmaapInfo = Objects.requireNonNull(dmaapInfo, "dmaapInfo"); + this.aafCredentials = aafCredentials; + } + + public String name() { + return name; + } + + @Override + public String topicUrl() { + return dmaapInfo.topicUrl(); + } + + @Override + public @Nullable String clientRole() { + return dmaapInfo.clientRole(); + } + + @Override + public @Nullable String clientId() { + return dmaapInfo.clientId(); + } + + @Override + public @Nullable String location() { + return dmaapInfo.location(); + } + + @Override + public @Nullable AafCredentials aafCredentials() { + return aafCredentials; + } +} \ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java new file mode 100644 index 00000000..5eef48d9 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; + +/** + * @author Kornel Janiak + */ + +public class GsonMessageRouterSink extends GsonMessageRouter implements MessageRouterSink { + GsonMessageRouterSink( + String name, @NotNull MessageRouterDmaapInfo dmaapInfo, + @Nullable AafCredentials aafCredentials) { + super(name, dmaapInfo, aafCredentials); + } +} \ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java new file mode 100644 index 00000000..d93a1d50 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource; + +/** + * @author Kornel Janiak + */ + +public class GsonMessageRouterSource extends GsonMessageRouter implements MessageRouterSource { + GsonMessageRouterSource( + String name, @NotNull MessageRouterDmaapInfo dmaapInfo, + @Nullable AafCredentials aafCredentials) { + super(name, dmaapInfo, aafCredentials); + } +} \ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java new file mode 100644 index 00000000..0ce0f80e --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java @@ -0,0 +1,42 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import com.google.gson.annotations.SerializedName; +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; + +@Gson.TypeAdapters +@Value.Immutable +public interface MessageRouterDmaapInfo { + + @SerializedName("topic_url") + String topicUrl(); + + @SerializedName("client_role") + @Nullable String clientRole(); + + @SerializedName("client_id") + @Nullable String clientId(); + + @SerializedName("location") + @Nullable String location(); +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java new file mode 100644 index 00000000..1f518fe8 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java @@ -0,0 +1,62 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; + +public final class MessageRouterSinkParser implements StreamFromGsonParser { + + private final Gson gson; + + public static MessageRouterSinkParser create() { + return new MessageRouterSinkParser(gsonInstance()); + } + + private MessageRouterSinkParser(Gson gson) { + this.gson = gson; + } + + @Override + public MessageRouterSink unsafeParse(RawDataStream input) { + assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SINK); + + final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class); + + final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME); + final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class); + + return new GsonMessageRouterSink(input.name(), dmaapInfo, aafCredentials); + + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java new file mode 100644 index 00000000..c6c1b22c --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource; + +/** + * @author Kornel Janiak + */ + +public final class MessageRouterSourceParser implements StreamFromGsonParser { + private final Gson gson; + + public static MessageRouterSourceParser create() { + return new MessageRouterSourceParser(gsonInstance()); + } + + private MessageRouterSourceParser(Gson gson) { + this.gson = gson; + } + + @Override + public MessageRouterSource unsafeParse(RawDataStream input) { + assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SOURCE); + + final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class); + + final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME); + final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class); + + return new GsonMessageRouterSource(input.name(), dmaapInfo, aafCredentials); + + } + +} \ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java new file mode 100644 index 00000000..ad9b021e --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java @@ -0,0 +1,99 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import java.util.Objects; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +abstract class GsonKafka implements Kafka { + + private final String name; + final KafkaInfo kafkaInfo; + private final AafCredentials aafCredentials; + + GsonKafka( + @NotNull String name, + @NotNull KafkaInfo kafkaInfo, + @Nullable AafCredentials aafCredentials) { + this.name = Objects.requireNonNull(name, "name"); + this.kafkaInfo = Objects.requireNonNull(kafkaInfo, "kafkaInfo"); + this.aafCredentials = aafCredentials; + } + + public String name() { + return name; + } + + @Override + public String bootstrapServers() { + return kafkaInfo.bootstrapServers(); + } + + @Override + public String topicName() { + return kafkaInfo.topicName(); + } + + @Override + public @Nullable AafCredentials aafCredentials() { + return aafCredentials; + } + + @Override + public @Nullable String clientRole() { + return kafkaInfo.clientRole(); + } + + @Override + public @Nullable String clientId() { + return kafkaInfo.clientId(); + } + + @Override + public int maxPayloadSizeBytes() { + return kafkaInfo.maxPayloadSizeBytes(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GsonKafka gsonKafka = (GsonKafka) o; + return kafkaInfo.equals(gsonKafka.kafkaInfo) && + Objects.equals(aafCredentials, gsonKafka.aafCredentials); + } + + @Override + public int hashCode() { + return Objects.hash(kafkaInfo, aafCredentials); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java new file mode 100644 index 00000000..4990f80a --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +class GsonKafkaSink extends GsonKafka implements KafkaSink { + + GsonKafkaSink( + @NotNull String name, + @NotNull KafkaInfo kafkaInfo, + @Nullable AafCredentials aafCredentials) { + super(name, kafkaInfo, aafCredentials); + } + +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java new file mode 100644 index 00000000..137964c3 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java @@ -0,0 +1,46 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +class GsonKafkaSource extends GsonKafka implements KafkaSource { + + GsonKafkaSource( + @NotNull String name, + @NotNull KafkaInfo kafkaInfo, + @Nullable AafCredentials aafCredentials) { + super(name, kafkaInfo, aafCredentials); + } + + @Override + public @Nullable String consumerGroupId() { + return kafkaInfo.consumerGroupId(); + } + +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java new file mode 100644 index 00000000..fd5602e6 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import com.google.gson.annotations.SerializedName; +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +@Value.Immutable +@Gson.TypeAdapters +public interface KafkaInfo { + + @SerializedName("bootstrap_servers") + String bootstrapServers(); + + @SerializedName("topic_name") + String topicName(); + + @SerializedName("consumer_group_id") + @Nullable String consumerGroupId(); + + @SerializedName("client_role") + @Nullable String clientRole(); + + @SerializedName("client_id") + @Nullable String clientId(); + + @Value.Default + @SerializedName("max_payload_size_bytes") + default int maxPayloadSizeBytes() { + return 1024 * 1024; + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java new file mode 100644 index 00000000..59373c45 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractAafCredentials; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractKafkaInfo; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE; + +/** + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +public final class KafkaSinkParser implements StreamFromGsonParser { + private final Gson gson; + + public static KafkaSinkParser create() { + return new KafkaSinkParser(gsonInstance()); + } + + private KafkaSinkParser(Gson gson) { + this.gson = gson; + } + + @Override + public KafkaSink unsafeParse(RawDataStream input) { + assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SINK); + final JsonObject json = input.descriptor(); + + final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json); + final AafCredentials aafCreds = extractAafCredentials(gson, json).getOrNull(); + + return new GsonKafkaSink(input.name(), kafkaInfo, aafCreds); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java new file mode 100644 index 00000000..6ac1dc99 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractAafCredentials; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractKafkaInfo; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE; + +/** + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +public final class KafkaSourceParser implements StreamFromGsonParser { + private final Gson gson; + + public static KafkaSourceParser create() { + return new KafkaSourceParser(gsonInstance()); + } + + private KafkaSourceParser(Gson gson) { + this.gson = gson; + } + + @Override + public KafkaSource unsafeParse(RawDataStream input) { + assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SOURCE); + final JsonObject json = input.descriptor(); + + final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json); + final AafCredentials aafCreds = extractAafCredentials(gson, json).getOrNull(); + + return new GsonKafkaSource(input.name(), kafkaInfo, aafCreds); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java new file mode 100644 index 00000000..4cfa33ac --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.optionalChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +final class KafkaUtils { + + private KafkaUtils() { + } + + static KafkaInfo extractKafkaInfo(Gson gson, JsonObject input) { + final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info"); + return gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class); + } + + static Option extractAafCredentials(Gson gson, JsonObject input) { + return optionalChild(input, "aaf_credentials") + .map(aafCredsJson -> gson.fromJson(aafCredsJson, ImmutableAafCredentials.class)); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java index e8d63192..c3c70b78 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java @@ -36,9 +36,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; @Gson.TypeAdapters public interface AafCredentials { - @SerializedName("aaf_username") + @SerializedName(value = "username", alternate = "aaf_username") @Nullable String username(); - @SerializedName("aaf_password") + @SerializedName(value = "password", alternate = "aaf_password") @Nullable String password(); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java index 43d9d726..37bf7e57 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; +import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; /** @@ -28,5 +29,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; */ @ExperimentalApi public interface DataStream { - + @Value.Default + default String name() { + return ""; + } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java new file mode 100644 index 00000000..f3cac547 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +public enum DataStreamDirection { + + SINK("streams_publishes"), + SOURCE("streams_subscribes"); + + private final String configurationKey; + + DataStreamDirection(String configurationKey) { + this.configurationKey = configurationKey; + } + + public String configurationKey() { + return configurationKey; + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java new file mode 100644 index 00000000..7a39ede5 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; + +import org.immutables.value.Value; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +@Value.Immutable +public interface RawDataStream { + String name(); + String type(); + DataStreamDirection direction(); + T descriptor(); +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java index 97f07a29..1810fc6c 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java @@ -20,6 +20,9 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; +import static io.vavr.Predicates.not; + +import io.vavr.collection.List; import org.immutables.value.Value; import org.jetbrains.annotations.Nullable; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; @@ -46,4 +49,9 @@ public interface Kafka { default int maxPayloadSizeBytes() { return 1024 * 1024; } + + @Value.Derived + default List bootstrapServerList() { + return List.of(bootstrapServers().split(",")).filter(not(String::isEmpty)); + } } diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java index c9ceeaf1..8a5edcc9 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java @@ -20,17 +20,15 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; +import static org.assertj.core.api.Assertions.assertThat; + import com.google.gson.JsonArray; import com.google.gson.JsonObject; import io.vavr.collection.List; +import java.math.BigInteger; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; -import java.math.BigInteger; - - -import static org.assertj.core.api.Assertions.assertThat; - class MerkleTreeParserTest { private final MerkleTreeParser cut = new MerkleTreeParser(); diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java index e862d849..e2833fe5 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java @@ -20,20 +20,29 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; +import static org.assertj.core.api.Assertions.assertThat; import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendResource; import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendString; import com.google.gson.JsonObject; +import io.vavr.collection.Map; import io.vavr.collection.Stream; import java.time.Duration; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -128,6 +137,91 @@ class CbsClientImplIT { .verify(Duration.ofSeconds(5)); } + @Test + void testCbsClientWithStreamsParsing() { + // given + final Mono sut = CbsClientFactory.createCbsClient(sampleEnvironment); + final StreamFromGsonParser kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser(); + final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + + // when + final Mono result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext)) + .map(json -> + DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head() + ); + + // then + StepVerifier.create(result) + .consumeNextWith(kafkaSink -> { + assertThat(kafkaSink.name()).isEqualTo("perf3gpp"); + assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060"); + assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP"); + }) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithStreamsParsingUsingSwitch() { + // given + final Mono sut = CbsClientFactory.createCbsClient(sampleEnvironment); + final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + // TODO: Use these parsers below + final StreamFromGsonParser kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser(); + final StreamFromGsonParser mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser(); + + // when + final Mono result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext)) + .map(json -> { + final Map>> sinks = DataStreams.namedSinks(json) + .groupBy(RawDataStream::type); + + final Stream allKafkaSinks = sinks.getOrElse("kafka", Stream.empty()) + .map(kafkaSinkParser::unsafeParse); + final Stream allMrSinks = sinks.getOrElse("message_router", Stream.empty()) + .map(mrSinkParser::unsafeParse); + + assertThat(allKafkaSinks.size()) + .describedAs("Number of kafka sinks") + .isEqualTo(2); + assertThat(allMrSinks.size()) + .describedAs("Number of DMAAP-MR sinks") + .isEqualTo(1); + + return true; + }) + .then(); + + // then + StepVerifier.create(result) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithStreamsParsingWhenUsingInvalidParser() { + // given + final Mono sut = CbsClientFactory.createCbsClient(sampleEnvironment); + final StreamFromGsonParser kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser(); + final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + + // when + final Mono result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext)) + .map(json -> + DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head() + ); + + // then + StepVerifier.create(result) + .expectErrorSatisfies(ex -> { + assertThat(ex).isInstanceOf(IllegalArgumentException.class); + assertThat(ex).hasMessageContaining("Invalid stream type"); + assertThat(ex).hasMessageContaining("message_router"); + assertThat(ex).hasMessageContaining("kafka"); + }) + .verify(Duration.ofSeconds(5)); + } + private String sampleConfigValue(JsonObject obj) { return obj.get(SAMPLE_CONFIG_KEY).getAsString(); } diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java index 9fd7cc88..617904f9 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java @@ -30,8 +30,8 @@ import static org.mockito.Mockito.verify; import com.google.gson.JsonObject; import java.net.InetSocketAddress; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import reactor.core.publisher.Mono; /** diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java index 6843e0e3..94ff53f9 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java @@ -31,9 +31,9 @@ import com.google.gson.JsonParser; import java.io.InputStreamReader; import java.net.InetSocketAddress; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java index d0485f57..7835a5f9 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java @@ -21,9 +21,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; import io.vavr.CheckedFunction0; -import io.vavr.Function0; -import java.io.IOException; -import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParserTest.java deleted file mode 100644 index 398ebcd9..00000000 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParserTest.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import io.vavr.control.Either; -import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink; - -import java.io.IOException; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; - - -class DataRouterSinkParserTest { - private static final String SAMPLE_LOCATION = "mtc00"; - private static final String SAMPLE_PUBLISH_URL = "https://we-are-data-router.us/feed/xyz"; - private static final String SAMPLE_LOG_URL = "https://we-are-data-router.us/feed/xyz/logs"; - private static final String SAMPLE_USER = "some-user"; - private static final String SAMPLE_PASSWORD = "some-password"; - private static final String SAMPLE_PUBLISHER_ID = "123456"; - - private static final Gson gson = new Gson(); - - private final StreamFromGsonParser streamParser = StreamFromGsonParsers.dataRouterSinkParser(); - - private static final DataRouterSink fullConfigurationStream = ImmutableDataRouterSink.builder() - .location(SAMPLE_LOCATION) - .publishUrl(SAMPLE_PUBLISH_URL) - .logUrl(SAMPLE_LOG_URL) - .username(SAMPLE_USER) - .password(SAMPLE_PASSWORD) - .publisherId(SAMPLE_PUBLISHER_ID) - .build(); - - private static final DataRouterSink minimalConfigurationStream = ImmutableDataRouterSink.builder() - .publishUrl(SAMPLE_PUBLISH_URL) - .build(); - - @Test - void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json"); - // when - DataRouterSink result = streamParser.unsafeParse(input); - // then - assertThat(result).isEqualTo(fullConfigurationStream); - } - - @Test - void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { - //given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_minimal.json"); - // when - DataRouterSink result = streamParser.unsafeParse(input); - // then - assertThat(result).isEqualTo(minimalConfigurationStream); - } - - @Test - void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json"); - // when - Either result = streamParser.parse(input); - // then - assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); - result.peekLeft(error -> { - assertThat(error.message()).contains("Invalid stream type"); - assertThat(error.message()).contains("Expected '" + DATA_ROUTER_TYPE + "', but was '" - + MESSAGE_ROUTER_TYPE + "'"); - } - ); - } - - @Test - void emptyConfiguration_shouldParseToStreamParserError() { - // given - JsonObject input = gson.fromJson("{}", JsonObject.class); - // when - Either result = streamParser.parse(input); - // then - assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); - } - -} \ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParserTest.java deleted file mode 100644 index 681fa147..00000000 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParserTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonObject; -import io.vavr.control.Either; -import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*; - -import java.io.IOException; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; - -public class DataRouterSourceParserTest { - private static final String SAMPLE_LOCATION = "mtc00"; - private static final String SAMPLE_DELIVERY_URL = "https://my-subscriber-app.dcae:8080/target-path"; - private static final String SAMPLE_USER = "some-user"; - private static final String SAMPLE_PASSWORD = "some-password"; - private static final String SAMPLE_SUBSCRIBER_ID = "789012"; - - private static final Gson gson = new Gson(); - - private static final DataRouterSource fullConfigurationStream = ImmutableDataRouterSource.builder() - .location(SAMPLE_LOCATION) - .deliveryUrl(SAMPLE_DELIVERY_URL) - .username(SAMPLE_USER) - .password(SAMPLE_PASSWORD) - .subscriberId(SAMPLE_SUBSCRIBER_ID) - .build(); - - private static final DataRouterSource minimalConfigurationStream = ImmutableDataRouterSource.builder() - .build(); - - - private final StreamFromGsonParser streamParser = StreamFromGsonParsers.dataRouterSourceParser(); - - @Test - void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_source_full.json"); - // when - DataRouterSource result = streamParser.unsafeParse(input); - // then - assertThat(result).isEqualTo(fullConfigurationStream); - } - - @Test - void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_source_minimal.json"); - // when - DataRouterSource result = streamParser.unsafeParse(input); - // then - assertThat(result).isEqualTo(minimalConfigurationStream); - } - - @Test - void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json"); - // when - Either result = streamParser.parse(input); - // then - assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); - result.peekLeft(error -> { - assertThat(error.message()).contains("Invalid stream type"); - assertThat(error.message()).contains("Expected '" + DATA_ROUTER_TYPE + "', but was '" - + MESSAGE_ROUTER_TYPE + "'"); - } - ); - } - - @Test - void emptyConfiguration_shouldBeParsedToStreamParserError() { - // given - JsonObject input = gson.fromJson("{}", JsonObject.class); - // when - Either result = streamParser.parse(input); - // then - assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); - } -} diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java deleted file mode 100644 index b5481203..00000000 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ - -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.in; -import static org.junit.jupiter.api.Assertions.*; - -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import io.vavr.Function1; -import io.vavr.control.Either; -import java.io.IOException; -import java.io.InputStreamReader; -import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; - -/** - * @author Piotr Jaszczyk - * @since March 2019 - */ -class KafkaSinkParserTest { - - private final StreamFromGsonParser cut = StreamFromGsonParsers.kafkaSinkParser(); - - @Test - void precondition_assureInstanceOf() { - assertThat(cut).isInstanceOf(KafkaSinkParser.class); - } - - @Test - void shouldParseMinimalKafkaSinkDefinition() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_minimal.json"); - - // when - final KafkaSink result = cut.unsafeParse(input); - - // then - assertThat(result.aafCredentials()).isNull(); - assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); - assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); - assertThat(result.clientId()).isNull(); - assertThat(result.clientRole()).isNull(); - } - - @Test - void shouldParseBasicKafkaSinkDefinition() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink.json"); - - // when - final KafkaSink result = cut.unsafeParse(input); - - // then - assertThat(result.aafCredentials()).isNull(); - assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); - assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); - assertThat(result.clientId()).isEqualTo("1500462518108"); - assertThat(result.clientRole()).isEqualTo("com.dcae.member"); - } - - @Test - void shouldReturnErrorWhenStructureIsWrong() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_missing_child.json"); - - // when - final Either result = cut.parse(input); - - // then - assertThat(result.isRight()).describedAs("should not be right").isFalse(); - result.peekLeft(error -> { - assertThat(error.message()).contains("kafka_info"); - }); - } - - @Test - void shouldReturnErrorWhenTypeIsWrong() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_invalid_type.json"); - - // when - final Either result = cut.parse(input); - - // then - assertThat(result.isRight()).describedAs("should not be right").isFalse(); - result.peekLeft(error -> { - assertThat(error.message()).containsIgnoringCase("invalid stream type"); - assertThat(error.message()).containsIgnoringCase("kafka"); - assertThat(error.message()).containsIgnoringCase("message_router"); - }); - } -} \ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java deleted file mode 100644 index 87131285..00000000 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ - -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.google.gson.JsonObject; -import java.io.IOException; -import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; - -/** - * @author Piotr Jaszczyk - * @since March 2019 - */ -class KafkaSourceParserTest { - - private final StreamFromGsonParser cut = StreamFromGsonParsers.kafkaSourceParser(); - - @Test - void precondition_assureInstanceOf() { - assertThat(cut).isInstanceOf(KafkaSourceParser.class); - } - - @Test - void shouldParseMinimalKafkaSourceDefinition() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_source_minimal.json"); - - // when - final KafkaSource result = cut.unsafeParse(input); - - // then - assertThat(result.aafCredentials()).isNull(); - assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); - assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); - assertThat(result.clientId()).isNull(); - assertThat(result.clientRole()).isNull(); - } -} \ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParserTest.java deleted file mode 100644 index 63b04d1d..00000000 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParserTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import io.vavr.control.Either; -import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; - -import java.io.IOException; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; - -/** - * @author Kornel Janiak - */ - -public class MessageRouterSinkParserTest { - - private static final String SAMPLE_AAF_USERNAME = "some-user"; - private static final String SAMPLE_AAF_PASSWORD = "some-password"; - private static final String SAMPLE_LOCATION = "mtc00"; - private static final String SAMPLE_CLIENT_ROLE = "com.dcae.member"; - private static final String SAMPLE_CLIENT_ID = "1500462518108"; - private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic"; - - private static final Gson gson = new Gson(); - - private final StreamFromGsonParser streamParser = StreamFromGsonParsers.messageRouterSinkParser(); - - @Test - void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json"); - // when - MessageRouterSink result = streamParser.unsafeParse(input); - // then - assertThat(result).isInstanceOf(MessageRouterSink.class); - assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME); - assertThat(result.aafCredentials().password()).isEqualTo(SAMPLE_AAF_PASSWORD); - assertThat(result.location()).isEqualTo(SAMPLE_LOCATION); - assertThat(result.clientRole()).isEqualTo(SAMPLE_CLIENT_ROLE); - assertThat(result.clientId()).isEqualTo(SAMPLE_CLIENT_ID); - assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL); - } - - @Test - void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_minimal.json"); - - // when - MessageRouterSink result = streamParser.unsafeParse(input); - // then - assertThat(result).isInstanceOf(MessageRouterSink.class); - assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL); - assertThat(result.aafCredentials().username()).isNull(); - assertThat(result.aafCredentials().password()).isNull(); - assertThat(result.clientId()).isNull(); - } - - @Test - void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json"); - // when - Either result = streamParser.parse(input); - // then - assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); - result.peekLeft(error -> { - assertThat(error.message()).contains("Invalid stream type"); - assertThat(error.message()).contains("Expected '" + MESSAGE_ROUTER_TYPE + "', but was '" - + DATA_ROUTER_TYPE + "'"); - } - ); - } - - @Test - void emptyConfiguration_shouldParseToStreamParserError() { - // given - JsonObject input = gson.fromJson("{}", JsonObject.class); - // when - Either result = streamParser.parse(input); - // then - assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); - } - - -} \ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParserTest.java deleted file mode 100644 index fea63d66..00000000 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParserTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import io.vavr.control.Either; -import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource; - -import java.io.IOException; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; - -/** - * @author Kornel Janiak - */ - -public class MessageRouterSourceParserTest { - - private static final String SAMPLE_AAF_USERNAME = "some-user"; - private static final String SAMPLE_AAF_PASSWORD = "some-password"; - private static final String SAMPLE_LOCATION = "mtc00"; - private static final String SAMPLE_CLIENT_ROLE = "com.dcae.member"; - private static final String SAMPLE_CLIENT_ID = "1500462518108"; - private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic"; - - private static final Gson gson = new Gson(); - - private final StreamFromGsonParser streamParser = StreamFromGsonParsers.messageRouterSourceParser(); - - @Test - void fullConfiguration_shouldGenerateDataRouterSourceObject() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json"); - // when - MessageRouterSource result = streamParser.unsafeParse(input); - // then - assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME); - assertThat(result.aafCredentials().password()).isEqualTo(SAMPLE_AAF_PASSWORD); - assertThat(result.location()).isEqualTo(SAMPLE_LOCATION); - assertThat(result.clientRole()).isEqualTo(SAMPLE_CLIENT_ROLE); - assertThat(result.clientId()).isEqualTo(SAMPLE_CLIENT_ID); - assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL); - } - - @Test - void minimalConfiguration_shouldGenerateDataRouterSourceObject() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_minimal.json"); - - // when - MessageRouterSource result = streamParser.unsafeParse(input); - // then - assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL); - assertThat(result.aafCredentials().username()).isNull(); - assertThat(result.aafCredentials().password()).isNull(); - assertThat(result.clientId()).isNull(); - } - - @Test - void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json"); - // when - Either result = streamParser.parse(input); - // then - assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); - result.peekLeft(error -> { - assertThat(error.message()).contains("Invalid stream type"); - assertThat(error.message()).contains("Expected '" + MESSAGE_ROUTER_TYPE + "', but was '" - + DATA_ROUTER_TYPE + "'"); - } - ); - } - - @Test - void emptyConfiguration_shouldParseToStreamParserError() { - // given - JsonObject input = gson.fromJson("{}", JsonObject.class); - // when - Either result = streamParser.parse(input); - // then - assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); - } - - -} \ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java new file mode 100644 index 00000000..7092de5a --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java @@ -0,0 +1,128 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import io.vavr.control.Either; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; + + +class DataRouterSinkParserTest { + + private static final String SAMPLE_LOCATION = "mtc00"; + private static final String SAMPLE_PUBLISH_URL = "https://we-are-data-router.us/feed/xyz"; + private static final String SAMPLE_LOG_URL = "https://we-are-data-router.us/feed/xyz/logs"; + private static final String SAMPLE_USER = "some-user"; + private static final String SAMPLE_PASSWORD = "some-password"; + private static final String SAMPLE_PUBLISHER_ID = "123456"; + + private final StreamFromGsonParser streamParser = StreamFromGsonParsers.dataRouterSinkParser(); + + @Test + void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSinkFromResource("/streams/data_router_sink_full.json"); + + // when + DataRouterSink result = streamParser.unsafeParse(input); + + // then + final DataRouterSink fullConfigurationStream = ImmutableDataRouterSink.builder() + .name(input.name()) + .location(SAMPLE_LOCATION) + .publishUrl(SAMPLE_PUBLISH_URL) + .logUrl(SAMPLE_LOG_URL) + .username(SAMPLE_USER) + .password(SAMPLE_PASSWORD) + .publisherId(SAMPLE_PUBLISHER_ID) + .build(); + assertThat(result).isEqualTo(fullConfigurationStream); + } + + @Test + void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { + //given + RawDataStream input = DataStreamUtils + .readSinkFromResource("/streams/data_router_sink_minimal.json"); + + // when + DataRouterSink result = streamParser.unsafeParse(input); + + // then + final DataRouterSink minimalConfigurationStream = ImmutableDataRouterSink.builder() + .name(input.name()) + .publishUrl(SAMPLE_PUBLISH_URL) + .build(); + assertThat(result).isEqualTo(minimalConfigurationStream); + } + + @Test + void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSinkFromResource("/streams/message_router_full.json"); + + // when + Either result = streamParser.parse(input); + + // then + assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); + result.peekLeft(error -> { + assertThat(error.message()).contains("Invalid stream type"); + assertThat(error.message()).contains("Expected '" + DATA_ROUTER_TYPE + "', but was '" + + MESSAGE_ROUTER_TYPE + "'"); + } + ); + } + + @Test + void emptyConfiguration_shouldParseToStreamParserError() { + // given + JsonObject json = new JsonObject(); + final ImmutableRawDataStream input = ImmutableRawDataStream.builder() + .name("empty") + .type("data_router") + .descriptor(json) + .direction(DataStreamDirection.SINK) + .build(); + + // when + Either result = streamParser.parse(input); + + // then + assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); + } + +} \ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java new file mode 100644 index 00000000..b2d01309 --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java @@ -0,0 +1,126 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; +import io.vavr.control.Either; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; + +public class DataRouterSourceParserTest { + + private static final String SAMPLE_LOCATION = "mtc00"; + private static final String SAMPLE_DELIVERY_URL = "https://my-subscriber-app.dcae:8080/target-path"; + private static final String SAMPLE_USER = "some-user"; + private static final String SAMPLE_PASSWORD = "some-password"; + private static final String SAMPLE_SUBSCRIBER_ID = "789012"; + + private final StreamFromGsonParser streamParser = StreamFromGsonParsers.dataRouterSourceParser(); + + @Test + void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSourceFromResource("/streams/data_router_source_full.json"); + + // when + DataRouterSource result = streamParser.unsafeParse(input); + + // then + + final DataRouterSource fullConfigurationStream = ImmutableDataRouterSource.builder() + .name(input.name()) + .location(SAMPLE_LOCATION) + .deliveryUrl(SAMPLE_DELIVERY_URL) + .username(SAMPLE_USER) + .password(SAMPLE_PASSWORD) + .subscriberId(SAMPLE_SUBSCRIBER_ID) + .build(); + assertThat(result).isEqualTo(fullConfigurationStream); + } + + @Test + void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { + // given + RawDataStream input = DataStreamUtils + .readSourceFromResource("/streams/data_router_source_minimal.json"); + + // when + DataRouterSource result = streamParser.unsafeParse(input); + + // then + final DataRouterSource minimalConfigurationStream = ImmutableDataRouterSource.builder() + .name(input.name()) + .build(); + assertThat(result).isEqualTo(minimalConfigurationStream); + } + + @Test + void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSourceFromResource("/streams/message_router_full.json"); + + // when + Either result = streamParser.parse(input); + + // then + assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); + result.peekLeft(error -> { + assertThat(error.message()).contains("Invalid stream type"); + assertThat(error.message()).contains("Expected '" + DATA_ROUTER_TYPE + "', but was '" + + MESSAGE_ROUTER_TYPE + "'"); + } + ); + } + + @Test + void emptyConfiguration_shouldBeParsedToStreamParserError() { + // given + JsonObject json = new JsonObject(); + final ImmutableRawDataStream input = ImmutableRawDataStream.builder() + .name("empty") + .type("data_router") + .descriptor(json) + .direction(DataStreamDirection.SOURCE) + .build(); + + // when + Either result = streamParser.parse(input); + + // then + assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); + } +} diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java new file mode 100644 index 00000000..4d3b88b8 --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java @@ -0,0 +1,128 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import io.vavr.control.Either; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; + +/** + * @author Kornel Janiak + */ + +public class MessageRouterSinkParserTest { + + private static final String SAMPLE_AAF_USERNAME = "some-user"; + private static final String SAMPLE_AAF_PASSWORD = "some-password"; + private static final String SAMPLE_LOCATION = "mtc00"; + private static final String SAMPLE_CLIENT_ROLE = "com.dcae.member"; + private static final String SAMPLE_CLIENT_ID = "1500462518108"; + private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic"; + + private final StreamFromGsonParser streamParser = StreamFromGsonParsers.messageRouterSinkParser(); + + @Test + void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSinkFromResource("/streams/message_router_full.json"); + + // when + MessageRouterSink result = streamParser.unsafeParse(input); + + // then + assertThat(result).isInstanceOf(MessageRouterSink.class); + assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME); + assertThat(result.aafCredentials().password()).isEqualTo(SAMPLE_AAF_PASSWORD); + assertThat(result.location()).isEqualTo(SAMPLE_LOCATION); + assertThat(result.clientRole()).isEqualTo(SAMPLE_CLIENT_ROLE); + assertThat(result.clientId()).isEqualTo(SAMPLE_CLIENT_ID); + assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL); + } + + @Test + void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSinkFromResource("/streams/message_router_minimal.json"); + + // when + MessageRouterSink result = streamParser.unsafeParse(input); + + // then + assertThat(result).isInstanceOf(MessageRouterSink.class); + assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL); + assertThat(result.aafCredentials().username()).isNull(); + assertThat(result.aafCredentials().password()).isNull(); + assertThat(result.clientId()).isNull(); + } + + @Test + void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSinkFromResource("/streams/data_router_sink_full.json"); + + // when + Either result = streamParser.parse(input); + + // then + assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); + result.peekLeft(error -> { + assertThat(error.message()).contains("Invalid stream type"); + assertThat(error.message()).contains("Expected '" + MESSAGE_ROUTER_TYPE + "', but was '" + + DATA_ROUTER_TYPE + "'"); + } + ); + } + + @Test + void emptyConfiguration_shouldParseToStreamParserError() { + // given + JsonObject json = new JsonObject(); + final ImmutableRawDataStream input = ImmutableRawDataStream.builder() + .name("empty") + .type("data_router") + .descriptor(json) + .direction(DataStreamDirection.SINK) + .build(); + + // when + Either result = streamParser.parse(input); + + // then + assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); + } + + +} \ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java new file mode 100644 index 00000000..d497817f --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java @@ -0,0 +1,124 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import io.vavr.control.Either; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; + +/** + * @author Kornel Janiak + */ + +public class MessageRouterSourceParserTest { + + private static final String SAMPLE_AAF_USERNAME = "some-user"; + private static final String SAMPLE_AAF_PASSWORD = "some-password"; + private static final String SAMPLE_LOCATION = "mtc00"; + private static final String SAMPLE_CLIENT_ROLE = "com.dcae.member"; + private static final String SAMPLE_CLIENT_ID = "1500462518108"; + private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic"; + + private final StreamFromGsonParser streamParser = StreamFromGsonParsers.messageRouterSourceParser(); + + @Test + void fullConfiguration_shouldGenerateDataRouterSourceObject() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSourceFromResource("/streams/message_router_full.json"); + + // when + MessageRouterSource result = streamParser.unsafeParse(input); + + // then + assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME); + assertThat(result.aafCredentials().password()).isEqualTo(SAMPLE_AAF_PASSWORD); + assertThat(result.location()).isEqualTo(SAMPLE_LOCATION); + assertThat(result.clientRole()).isEqualTo(SAMPLE_CLIENT_ROLE); + assertThat(result.clientId()).isEqualTo(SAMPLE_CLIENT_ID); + assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL); + } + + @Test + void minimalConfiguration_shouldGenerateDataRouterSourceObject() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSourceFromResource("/streams/message_router_minimal.json"); + + // when + MessageRouterSource result = streamParser.unsafeParse(input); + + // then + assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL); + assertThat(result.aafCredentials().username()).isNull(); + assertThat(result.aafCredentials().password()).isNull(); + assertThat(result.clientId()).isNull(); + } + + @Test + void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSourceFromResource("/streams/data_router_sink_full.json"); + + // when + Either result = streamParser.parse(input); + + // then + assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); + result.peekLeft(error -> { + assertThat(error.message()).contains("Invalid stream type"); + assertThat(error.message()).contains("Expected '" + MESSAGE_ROUTER_TYPE + "', but was '" + + DATA_ROUTER_TYPE + "'"); + } + ); + } + + @Test + void emptyConfiguration_shouldParseToStreamParserError() { + // given + JsonObject json = new JsonObject(); + final ImmutableRawDataStream input = ImmutableRawDataStream.builder() + .name("empty") + .type("data_router") + .descriptor(json) + .direction(DataStreamDirection.SOURCE) + .build(); + // when + Either result = streamParser.parse(input); + // then + assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); + } + + +} \ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java new file mode 100644 index 00000000..5974639c --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java @@ -0,0 +1,133 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.gson.JsonObject; +import io.vavr.control.Either; +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +class KafkaSinkParserTest { + + private final StreamFromGsonParser cut = StreamFromGsonParsers.kafkaSinkParser(); + + @Test + void precondition_assureInstanceOf() { + assertThat(cut).isInstanceOf(KafkaSinkParser.class); + } + + @Test + void shouldParseMinimalKafkaSinkDefinition() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink_minimal.json"); + + // when + final KafkaSink result = cut.unsafeParse(input); + + // then + assertThat(result.aafCredentials()).isNull(); + assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); + assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); + assertThat(result.clientId()).isNull(); + assertThat(result.clientRole()).isNull(); + } + + @Test + void shouldParseFullKafkaSinkDefinition() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink.json"); + + // when + final KafkaSink result = cut.unsafeParse(input); + + // then + final ImmutableAafCredentials expectedCredentials = ImmutableAafCredentials.builder() + .username("the user") + .password("the passwd") + .build(); + assertThat(result.aafCredentials()).isEqualTo(expectedCredentials); + assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); + assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); + assertThat(result.clientId()).isEqualTo("1500462518108"); + assertThat(result.clientRole()).isEqualTo("com.dcae.member"); + } + + @Test + void shouldReturnErrorWhenStructureIsWrong() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink_missing_child.json"); + + // when + final Either result = cut.parse(input); + + // then + assertThat(result.isRight()).describedAs("should not be right").isFalse(); + result.peekLeft(error -> { + assertThat(error.message()).contains("kafka_info"); + }); + } + + @Test + void shouldReturnErrorWhenTypeIsWrong() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSinkFromResource("/streams/kafka_invalid_type.json"); + + // when + final Either result = cut.parse(input); + + // then + assertThat(result.isRight()).describedAs("should not be right").isFalse(); + result.peekLeft(error -> { + assertThat(error.message()).containsIgnoringCase("invalid stream type"); + assertThat(error.message()).containsIgnoringCase("kafka"); + assertThat(error.message()).containsIgnoringCase("message_router"); + }); + } + + @Test + void shouldReturnErrorWhenDirectionIsWrong() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSourceFromResource("/streams/kafka_sink.json"); + + // when + final Either result = cut.parse(input); + + // then + assertThat(result.isRight()).describedAs("should not be right").isFalse(); + result.peekLeft(error -> { + assertThat(error.message()).containsIgnoringCase("invalid stream direction"); + }); + } +} \ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java new file mode 100644 index 00000000..d255d99a --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java @@ -0,0 +1,120 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.gson.JsonObject; +import io.vavr.collection.List; +import io.vavr.control.Either; +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +class KafkaSourceParserTest { + + private final StreamFromGsonParser cut = StreamFromGsonParsers.kafkaSourceParser(); + + @Test + void precondition_assureInstanceOf() { + assertThat(cut).isInstanceOf(KafkaSourceParser.class); + } + + @Test + void shouldParseMinimalKafkaSourceDefinition() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSourceFromResource("/streams/kafka_source_minimal.json"); + + // when + final KafkaSource result = cut.unsafeParse(input); + + // then + assertThat(result.aafCredentials()).isNull(); + assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); + assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); + assertThat(result.clientId()).isNull(); + assertThat(result.clientRole()).isNull(); + } + + @Test + void shouldParseFullKafkaSourceDefinition() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSourceFromResource("/streams/kafka_source.json"); + + // when + final KafkaSource result = cut.unsafeParse(input); + + // then + final ImmutableAafCredentials expectedCredentials = ImmutableAafCredentials.builder() + .username("the user") + .password("the passwd") + .build(); + assertThat(result.aafCredentials()).isEqualTo(expectedCredentials); + assertThat(result.bootstrapServerList()).isEqualTo(List.of("dmaap-mr-kafka-0:6060", "dmaap-mr-kafka-1:6060")); + assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); + assertThat(result.consumerGroupId()).isEqualTo("nokia-perf3gpp-processor"); + assertThat(result.clientId()).isEqualTo("1500462518108"); + assertThat(result.clientRole()).isEqualTo("com.dcae.member"); + } + + @Test + void shouldReturnErrorWhenTypeIsWrong() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSourceFromResource("/streams/kafka_invalid_type.json"); + + // when + final Either result = cut.parse(input); + + // then + assertThat(result.isRight()).describedAs("should not be right").isFalse(); + result.peekLeft(error -> { + assertThat(error.message()).containsIgnoringCase("invalid stream type"); + assertThat(error.message()).containsIgnoringCase("kafka"); + assertThat(error.message()).containsIgnoringCase("message_router"); + }); + } + + @Test + void shouldReturnErrorWhenDirectionIsWrong() throws IOException { + // given + RawDataStream input = DataStreamUtils.readSinkFromResource("/streams/kafka_source.json"); + + // when + final Either result = cut.parse(input); + + // then + assertThat(result.isRight()).describedAs("should not be right").isFalse(); + result.peekLeft(error -> { + assertThat(error.message()).containsIgnoringCase("invalid stream direction"); + }); + } +} \ No newline at end of file diff --git a/rest-services/cbs-client/src/test/resources/sample_config.json b/rest-services/cbs-client/src/test/resources/sample_config.json index a95b723f..266326f4 100644 --- a/rest-services/cbs-client/src/test/resources/sample_config.json +++ b/rest-services/cbs-client/src/test/resources/sample_config.json @@ -1,3 +1,33 @@ { - "keystore.path": "/var/run/security/keystore.p12" + "keystore.path": "/var/run/security/keystore.p12", + "streams_publishes": { + "perf3gpp": { + "type": "kafka", + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka:6060", + "topic_name": "HVVES_PERF3GPP" + } + }, + "pnf_ready": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://message-router:3904/events/VES_PNF_READY" + } + }, + "call_trace": { + "type": "kafka", + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka:6060", + "topic_name": "HVVES_TRACE" + } + } + }, + "streams_subscribes": { + "measurements": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://message-router:3904/events/VES_MEASUREMENT" + } + } + } } diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_invalid_type.json b/rest-services/cbs-client/src/test/resources/streams/kafka_invalid_type.json new file mode 100644 index 00000000..0ee88adb --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_invalid_type.json @@ -0,0 +1,7 @@ +{ + "type": "message_router", + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060", + "topic_name": "HVVES_PERF3GPP" + } +} diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json index b60388d5..e7b45508 100644 --- a/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json @@ -1,5 +1,9 @@ { "type": "kafka", + "aaf_credentials": { + "username": "the user", + "password": "the passwd" + }, "kafka_info": { "client_role": "com.dcae.member", "client_id": "1500462518108", diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json deleted file mode 100644 index 0ee88adb..00000000 --- a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "type": "message_router", - "kafka_info": { - "bootstrap_servers": "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060", - "topic_name": "HVVES_PERF3GPP" - } -} diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_source.json b/rest-services/cbs-client/src/test/resources/streams/kafka_source.json new file mode 100644 index 00000000..379dbef1 --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_source.json @@ -0,0 +1,14 @@ +{ + "type": "kafka", + "aaf_credentials": { + "username": "the user", + "password": "the passwd" + }, + "kafka_info": { + "client_role": "com.dcae.member", + "client_id": "1500462518108", + "bootstrap_servers": "dmaap-mr-kafka-0:6060,,dmaap-mr-kafka-1:6060,", + "topic_name": "HVVES_PERF3GPP", + "consumer_group_id": "nokia-perf3gpp-processor" + } +} -- cgit 1.2.3-korg