diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-03-14 14:34:35 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-03-19 15:06:15 +0100 |
commit | 565ec734f46a15bb3c87fe02a32613fc86c0eb22 (patch) | |
tree | 71412d433e9e952db2e2f9db6044dc1a9643e9c0 /rest-services | |
parent | 9b03823dc6ac4bec2e61a4e54d114a518dbb1100 (diff) |
Kafka streams parsers - additions
Change-Id: I98ca661682b41d76d3de668d6faeb6ebe02f92a8
Issue-ID: DCAEGEN2-1341
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'rest-services')
46 files changed, 853 insertions, 257 deletions
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java index 36589dad..989bd2db 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java @@ -20,9 +20,9 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsClientImpl; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsLookup; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; import reactor.core.publisher.Mono; diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java index 15c4eea2..dfd0e2f7 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java @@ -19,17 +19,16 @@ */ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; +import static java.lang.String.valueOf; + import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.vavr.collection.List; -import org.jetbrains.annotations.NotNull; - import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; - -import static java.lang.String.valueOf; +import org.jetbrains.annotations.NotNull; /** diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java new file mode 100644 index 00000000..4fdb31b1 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java @@ -0,0 +1,57 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.vavr.collection.Stream; +import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +@ExperimentalApi +public final class DataStreams { + + private DataStreams() { + } + + public static Stream<RawDataStream<JsonObject>> namedSources(JsonObject rootJson) { + return createCollectionOfStreams(rootJson, DataStreamDirection.SOURCE); + } + + public static Stream<RawDataStream<JsonObject>> namedSinks(JsonObject rootJson) { + return createCollectionOfStreams(rootJson, DataStreamDirection.SINK); + } + + private static Stream<RawDataStream<JsonObject>> createCollectionOfStreams(JsonObject rootJson, DataStreamDirection direction) { + final JsonElement streamsJson = rootJson.get(direction.configurationKey()); + return streamsJson == null + ? Stream.empty() + : DataStreamUtils.mapJsonToStreams(streamsJson, direction); + } + + +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java index f18f2175..460d7100 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java @@ -21,10 +21,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; import com.google.gson.JsonObject; -import io.vavr.control.Either; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; /** diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java index 9d703bb3..7ae92baf 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java @@ -20,12 +20,17 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.*; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr.DataRouterSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr.DataRouterSourceParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSourceParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @since March 2019 + * @since 1.1.4 */ public final class StreamFromGsonParsers { diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java index 3467c809..69016ed8 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java @@ -26,6 +26,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; /** * A generic data stream parser which parses {@code T} to data stream {@code S}. @@ -33,7 +34,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Dat * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @param <T> input data type, eg. Gson Object * @param <S> output data type - * @since 1.1.3 + * @since 1.1.4 */ @ExperimentalApi public interface StreamParser<T, S extends DataStream> { @@ -44,7 +45,7 @@ public interface StreamParser<T, S extends DataStream> { * @param input - the input data * @return Right(parsing result) or Left(parsing error) */ - default Either<StreamParserError, S> parse(T input) { + default Either<StreamParserError, S> parse(RawDataStream<T> input) { return Try.of(() -> unsafeParse(input)) .toEither() .mapLeft(StreamParserError::fromThrowable); @@ -58,7 +59,7 @@ public interface StreamParser<T, S extends DataStream> { * @return parsing result * @throws StreamParsingException when parsing was unsuccessful */ - default S unsafeParse(T input) { + default S unsafeParse(RawDataStream<T> input) { return parse(input).getOrElseThrow(StreamParsingException::new); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java index 05bfc9be..9be08e3c 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java @@ -24,9 +24,9 @@ import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URL; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import reactor.core.publisher.Mono; public class CbsClientImpl implements CbsClient { diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java index 53d0bd34..89daebc8 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java @@ -22,12 +22,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; import com.google.gson.JsonArray; import com.google.gson.JsonObject; - import java.net.InetSocketAddress; - +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import reactor.core.publisher.Mono; /** diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java new file mode 100644 index 00000000..d34b1440 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.vavr.collection.Stream; +import java.io.IOException; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +public final class DataStreamUtils { + + public static Stream<RawDataStream<JsonObject>> mapJsonToStreams(JsonElement streamsJson, + DataStreamDirection direction) { + return Stream.ofAll(streamsJson.getAsJsonObject().entrySet()) + .map(namedSinkJson -> { + final JsonObject jsonObject = namedSinkJson.getValue().getAsJsonObject(); + return rawDataStream(namedSinkJson.getKey(), direction, jsonObject); + }); + } + + public static void assertStreamType( + RawDataStream<JsonObject> json, + String expectedType, + DataStreamDirection expectedDirection) { + if (!json.type().equals(expectedType)) { + throw new IllegalArgumentException( + "Invalid stream type. Expected '" + expectedType + "', but was '" + json.type() + "'"); + } + if (json.direction() != expectedDirection) { + throw new IllegalArgumentException( + "Invalid stream direction. Expected '" + expectedDirection + "', but was '" + json.direction() + + "'"); + } + } + + public static RawDataStream<JsonObject> readSourceFromResource(String resource) throws IOException { + return rawDataStream(resource, DataStreamDirection.SOURCE, GsonUtils.readObjectFromResource(resource)); + } + + public static RawDataStream<JsonObject> readSinkFromResource(String resource) throws IOException { + return rawDataStream(resource, DataStreamDirection.SINK, GsonUtils.readObjectFromResource(resource)); + } + + private static RawDataStream<JsonObject> rawDataStream(String name, DataStreamDirection direction, JsonObject json) { + return ImmutableRawDataStream.<JsonObject>builder() + .name(name) + .direction(direction) + .type(GsonUtils.requiredString(json, "type")) + .descriptor(json) + .build(); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java index a0880165..0b662286 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java @@ -26,14 +26,14 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import io.vavr.Lazy; - +import io.vavr.control.Option; import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; import java.util.Map.Entry; import java.util.stream.Collectors; - -import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.GsonAdaptersMessageRouterDmaapInfo; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.GsonAdaptersKafkaInfo; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSource; @@ -42,7 +42,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since March 2019 */ -final class GsonUtils { +public final class GsonUtils { + private static final Lazy<Gson> GSON = Lazy.of(() -> { GsonBuilder gsonBuilder = new GsonBuilder(); gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersKafkaInfo()); @@ -56,39 +57,35 @@ final class GsonUtils { private GsonUtils() { } - static Gson gsonInstance() { + public static Gson gsonInstance() { return GSON.get(); } - static void assertStreamType(JsonObject json, String expectedType) { - final String actualType = requiredString(json, "type"); - if (!actualType.equals(expectedType)) { - throw new IllegalArgumentException("Invalid stream type. Expected '" + expectedType + "', but was '" + actualType + "'"); - } - } - - static String requiredString(JsonObject parent, String childName) { + public static String requiredString(JsonObject parent, String childName) { return requiredChild(parent, childName).getAsString(); } - static Option<String> optionalString(JsonObject parent, String childName) { + public static Option<String> optionalString(JsonObject parent, String childName) { return Option.of(parent.get(childName).getAsString()); } - static JsonElement requiredChild(JsonObject parent, String childName) { - if (parent.has(childName)) { - return parent.get(childName); - } else { - throw new IllegalArgumentException( - "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + stringifyChildrenNames(parent)); - } + public static JsonElement requiredChild(JsonObject parent, String childName) { + return optionalChild(parent, childName) + .getOrElseThrow(() -> new IllegalArgumentException( + "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + + stringifyChildrenNames(parent))); + + } + + public static Option<JsonElement> optionalChild(JsonObject parent, String childName) { + return Option.of(parent.get(childName)); } - static JsonObject readObjectFromResource(String resource) throws IOException { + public static JsonObject readObjectFromResource(String resource) throws IOException { return readFromResource(resource).getAsJsonObject(); } - static JsonElement readFromResource(String resource) throws IOException { + public static JsonElement readFromResource(String resource) throws IOException { try (Reader reader = new InputStreamReader(GsonUtils.class.getResourceAsStream(resource))) { return new JsonParser().parse(reader); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java index 468b6180..42b86aee 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java @@ -17,19 +17,23 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; - /** * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a> */ @@ -45,12 +49,12 @@ public final class DataRouterSinkParser implements StreamFromGsonParser<DataRout this.gson = gson; } - public DataRouterSink unsafeParse(JsonObject input) { - assertStreamType(input, DATA_ROUTER_TYPE); - - final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME); + @Override + public DataRouterSink unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SINK); - return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class); + final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME); + return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class).withName(input.name()); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java index d78c3dde..7d29a90f 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java @@ -17,19 +17,23 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSource; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSource; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; - /** * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a> */ @@ -45,12 +49,12 @@ public final class DataRouterSourceParser implements StreamFromGsonParser<DataRo this.gson = gson; } - public DataRouterSource unsafeParse(JsonObject input) { - assertStreamType(input, DATA_ROUTER_TYPE); - - final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME); + @Override + public DataRouterSource unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SOURCE); - return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class); + final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME); + return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class).withName(input.name()); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java index 10c00e72..c5d254f0 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -31,15 +31,21 @@ import java.util.Objects; */ abstract class GsonMessageRouter implements MessageRouter { + private final String name; private final MessageRouterDmaapInfo dmaapInfo; private final AafCredentials aafCredentials; - GsonMessageRouter(@NotNull MessageRouterDmaapInfo dmaapInfo, - @Nullable AafCredentials aafCredentials) { + GsonMessageRouter(String name, @NotNull MessageRouterDmaapInfo dmaapInfo, + @Nullable AafCredentials aafCredentials) { + this.name = name; this.dmaapInfo = Objects.requireNonNull(dmaapInfo, "dmaapInfo"); this.aafCredentials = aafCredentials; } + public String name() { + return name; + } + @Override public String topicUrl() { return dmaapInfo.topicUrl(); diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java index da218420..5eef48d9 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -30,8 +30,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma public class GsonMessageRouterSink extends GsonMessageRouter implements MessageRouterSink { GsonMessageRouterSink( - @NotNull MessageRouterDmaapInfo dmaapInfo, + String name, @NotNull MessageRouterDmaapInfo dmaapInfo, @Nullable AafCredentials aafCredentials) { - super(dmaapInfo, aafCredentials); + super(name, dmaapInfo, aafCredentials); } }
\ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java index b69c53db..d93a1d50 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -30,8 +30,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma public class GsonMessageRouterSource extends GsonMessageRouter implements MessageRouterSource { GsonMessageRouterSource( - @NotNull MessageRouterDmaapInfo dmaapInfo, + String name, @NotNull MessageRouterDmaapInfo dmaapInfo, @Nullable AafCredentials aafCredentials) { - super(dmaapInfo, aafCredentials); + super(name, dmaapInfo, aafCredentials); } }
\ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java index ced5ad55..0ce0f80e 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; import com.google.gson.annotations.SerializedName; import org.immutables.gson.Gson; diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java index 56f53932..1f518fe8 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java @@ -17,20 +17,24 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; - public final class MessageRouterSinkParser implements StreamFromGsonParser<MessageRouterSink> { private final Gson gson; @@ -43,15 +47,16 @@ public final class MessageRouterSinkParser implements StreamFromGsonParser<Messa this.gson = gson; } - public MessageRouterSink unsafeParse(JsonObject input) { - assertStreamType(input, MESSAGE_ROUTER_TYPE); + @Override + public MessageRouterSink unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SINK); - final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class); + final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class); - final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME); + final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME); final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class); - return new GsonMessageRouterSink(dmaapInfo, aafCredentials); + return new GsonMessageRouterSink(input.name(), dmaapInfo, aafCredentials); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java index 25cf9e0e..c6c1b22c 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java @@ -17,20 +17,24 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; - /** * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a> */ @@ -46,15 +50,16 @@ public final class MessageRouterSourceParser implements StreamFromGsonParser<Mes this.gson = gson; } - public MessageRouterSource unsafeParse(JsonObject input) { - assertStreamType(input, MESSAGE_ROUTER_TYPE); + @Override + public MessageRouterSource unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SOURCE); - final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class); + final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class); - final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME); + final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME); final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class); - return new GsonMessageRouterSource(dmaapInfo, aafCredentials); + return new GsonMessageRouterSource(input.name(), dmaapInfo, aafCredentials); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java index 2ad37a84..ad9b021e 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java @@ -18,7 +18,7 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; import java.util.Objects; import org.jetbrains.annotations.NotNull; @@ -32,15 +32,23 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma */ abstract class GsonKafka implements Kafka { - protected final KafkaInfo kafkaInfo; + private final String name; + final KafkaInfo kafkaInfo; private final AafCredentials aafCredentials; - GsonKafka(@NotNull KafkaInfo kafkaInfo, + GsonKafka( + @NotNull String name, + @NotNull KafkaInfo kafkaInfo, @Nullable AafCredentials aafCredentials) { + this.name = Objects.requireNonNull(name, "name"); this.kafkaInfo = Objects.requireNonNull(kafkaInfo, "kafkaInfo"); this.aafCredentials = aafCredentials; } + public String name() { + return name; + } + @Override public String bootstrapServers() { return kafkaInfo.bootstrapServers(); diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java index c45f8470..4990f80a 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java @@ -18,7 +18,7 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -32,8 +32,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma class GsonKafkaSink extends GsonKafka implements KafkaSink { GsonKafkaSink( + @NotNull String name, @NotNull KafkaInfo kafkaInfo, @Nullable AafCredentials aafCredentials) { - super(kafkaInfo, aafCredentials); + super(name, kafkaInfo, aafCredentials); } + } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java index 1509d9d7..137964c3 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java @@ -18,7 +18,7 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -32,9 +32,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma class GsonKafkaSource extends GsonKafka implements KafkaSource { GsonKafkaSource( + @NotNull String name, @NotNull KafkaInfo kafkaInfo, @Nullable AafCredentials aafCredentials) { - super(kafkaInfo, aafCredentials); + super(name, kafkaInfo, aafCredentials); } @Override diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java index 8b17a8d4..fd5602e6 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java @@ -18,7 +18,7 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; import com.google.gson.annotations.SerializedName; import org.immutables.gson.Gson; diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java index f9a546c2..59373c45 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java @@ -18,16 +18,21 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractAafCredentials; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractKafkaInfo; import com.google.gson.Gson; -import com.google.gson.JsonElement; import com.google.gson.JsonObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_INFO_CHILD_NAME; import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE; /** @@ -46,12 +51,13 @@ public final class KafkaSinkParser implements StreamFromGsonParser<KafkaSink> { } @Override - public KafkaSink unsafeParse(JsonObject input) { - assertStreamType(input, KAFKA_TYPE); + public KafkaSink unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SINK); + final JsonObject json = input.descriptor(); - final JsonElement kafkaInfoJson = requiredChild(input, KAFKA_INFO_CHILD_NAME); - final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class); + final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json); + final AafCredentials aafCreds = extractAafCredentials(gson, json).getOrNull(); - return new GsonKafkaSink(kafkaInfo, null); + return new GsonKafkaSink(input.name(), kafkaInfo, aafCreds); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java index 08c02b47..6ac1dc99 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java @@ -18,16 +18,21 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractAafCredentials; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractKafkaInfo; import com.google.gson.Gson; -import com.google.gson.JsonElement; import com.google.gson.JsonObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_INFO_CHILD_NAME; import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE; /** @@ -46,12 +51,13 @@ public final class KafkaSourceParser implements StreamFromGsonParser<KafkaSource } @Override - public KafkaSource unsafeParse(JsonObject input) { - assertStreamType(input, KAFKA_TYPE); + public KafkaSource unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SOURCE); + final JsonObject json = input.descriptor(); - final JsonElement kafkaInfoJson = requiredChild(input, KAFKA_INFO_CHILD_NAME); - final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class); + final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json); + final AafCredentials aafCreds = extractAafCredentials(gson, json).getOrNull(); - return new GsonKafkaSource(kafkaInfo, null); + return new GsonKafkaSource(input.name(), kafkaInfo, aafCreds); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java new file mode 100644 index 00000000..4cfa33ac --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.optionalChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +final class KafkaUtils { + + private KafkaUtils() { + } + + static KafkaInfo extractKafkaInfo(Gson gson, JsonObject input) { + final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info"); + return gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class); + } + + static Option<AafCredentials> extractAafCredentials(Gson gson, JsonObject input) { + return optionalChild(input, "aaf_credentials") + .map(aafCredsJson -> gson.fromJson(aafCredsJson, ImmutableAafCredentials.class)); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java index e8d63192..c3c70b78 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java @@ -36,9 +36,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; @Gson.TypeAdapters public interface AafCredentials { - @SerializedName("aaf_username") + @SerializedName(value = "username", alternate = "aaf_username") @Nullable String username(); - @SerializedName("aaf_password") + @SerializedName(value = "password", alternate = "aaf_password") @Nullable String password(); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java index 43d9d726..37bf7e57 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; +import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; /** @@ -28,5 +29,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; */ @ExperimentalApi public interface DataStream { - + @Value.Default + default String name() { + return ""; + } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java new file mode 100644 index 00000000..f3cac547 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +public enum DataStreamDirection { + + SINK("streams_publishes"), + SOURCE("streams_subscribes"); + + private final String configurationKey; + + DataStreamDirection(String configurationKey) { + this.configurationKey = configurationKey; + } + + public String configurationKey() { + return configurationKey; + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java new file mode 100644 index 00000000..7a39ede5 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; + +import org.immutables.value.Value; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +@Value.Immutable +public interface RawDataStream<T> { + String name(); + String type(); + DataStreamDirection direction(); + T descriptor(); +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java index 97f07a29..1810fc6c 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java @@ -20,6 +20,9 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; +import static io.vavr.Predicates.not; + +import io.vavr.collection.List; import org.immutables.value.Value; import org.jetbrains.annotations.Nullable; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; @@ -46,4 +49,9 @@ public interface Kafka { default int maxPayloadSizeBytes() { return 1024 * 1024; } + + @Value.Derived + default List<String> bootstrapServerList() { + return List.of(bootstrapServers().split(",")).filter(not(String::isEmpty)); + } } diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java index c9ceeaf1..8a5edcc9 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java @@ -20,17 +20,15 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; +import static org.assertj.core.api.Assertions.assertThat; + import com.google.gson.JsonArray; import com.google.gson.JsonObject; import io.vavr.collection.List; +import java.math.BigInteger; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; -import java.math.BigInteger; - - -import static org.assertj.core.api.Assertions.assertThat; - class MerkleTreeParserTest { private final MerkleTreeParser cut = new MerkleTreeParser(); diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java index e862d849..e2833fe5 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java @@ -20,20 +20,29 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; +import static org.assertj.core.api.Assertions.assertThat; import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendResource; import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendString; import com.google.gson.JsonObject; +import io.vavr.collection.Map; import io.vavr.collection.Stream; import java.time.Duration; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -128,6 +137,91 @@ class CbsClientImplIT { .verify(Duration.ofSeconds(5)); } + @Test + void testCbsClientWithStreamsParsing() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); + final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser(); + final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + + // when + final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext)) + .map(json -> + DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head() + ); + + // then + StepVerifier.create(result) + .consumeNextWith(kafkaSink -> { + assertThat(kafkaSink.name()).isEqualTo("perf3gpp"); + assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060"); + assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP"); + }) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithStreamsParsingUsingSwitch() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); + final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + // TODO: Use these parsers below + final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser(); + final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser(); + + // when + final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext)) + .map(json -> { + final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json) + .groupBy(RawDataStream::type); + + final Stream<KafkaSink> allKafkaSinks = sinks.getOrElse("kafka", Stream.empty()) + .map(kafkaSinkParser::unsafeParse); + final Stream<MessageRouterSink> allMrSinks = sinks.getOrElse("message_router", Stream.empty()) + .map(mrSinkParser::unsafeParse); + + assertThat(allKafkaSinks.size()) + .describedAs("Number of kafka sinks") + .isEqualTo(2); + assertThat(allMrSinks.size()) + .describedAs("Number of DMAAP-MR sinks") + .isEqualTo(1); + + return true; + }) + .then(); + + // then + StepVerifier.create(result) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithStreamsParsingWhenUsingInvalidParser() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); + final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser(); + final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + + // when + final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext)) + .map(json -> + DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head() + ); + + // then + StepVerifier.create(result) + .expectErrorSatisfies(ex -> { + assertThat(ex).isInstanceOf(IllegalArgumentException.class); + assertThat(ex).hasMessageContaining("Invalid stream type"); + assertThat(ex).hasMessageContaining("message_router"); + assertThat(ex).hasMessageContaining("kafka"); + }) + .verify(Duration.ofSeconds(5)); + } + private String sampleConfigValue(JsonObject obj) { return obj.get(SAMPLE_CONFIG_KEY).getAsString(); } diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java index 9fd7cc88..617904f9 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java @@ -30,8 +30,8 @@ import static org.mockito.Mockito.verify; import com.google.gson.JsonObject; import java.net.InetSocketAddress; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import reactor.core.publisher.Mono; /** diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java index 6843e0e3..94ff53f9 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java @@ -31,9 +31,9 @@ import com.google.gson.JsonParser; import java.io.InputStreamReader; import java.net.InetSocketAddress; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java index d0485f57..7835a5f9 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java @@ -21,9 +21,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; import io.vavr.CheckedFunction0; -import io.vavr.Function0; -import java.io.IOException; -import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java deleted file mode 100644 index 87131285..00000000 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * ============LICENSE_START==================================== - * DCAEGEN2-SERVICES-SDK - * ========================================================= - * Copyright (C) 2019 Nokia. All rights reserved. - * ========================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END===================================== - */ - -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.google.gson.JsonObject; -import java.io.IOException; -import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; - -/** - * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @since March 2019 - */ -class KafkaSourceParserTest { - - private final StreamFromGsonParser<KafkaSource> cut = StreamFromGsonParsers.kafkaSourceParser(); - - @Test - void precondition_assureInstanceOf() { - assertThat(cut).isInstanceOf(KafkaSourceParser.class); - } - - @Test - void shouldParseMinimalKafkaSourceDefinition() throws IOException { - // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_source_minimal.json"); - - // when - final KafkaSource result = cut.unsafeParse(input); - - // then - assertThat(result.aafCredentials()).isNull(); - assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); - assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); - assertThat(result.clientId()).isNull(); - assertThat(result.clientRole()).isNull(); - } -}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java index 398ebcd9..7092de5a 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParserTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr; import com.google.gson.Gson; import com.google.gson.JsonObject; @@ -26,6 +26,10 @@ import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink; @@ -37,6 +41,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea class DataRouterSinkParserTest { + private static final String SAMPLE_LOCATION = "mtc00"; private static final String SAMPLE_PUBLISH_URL = "https://we-are-data-router.us/feed/xyz"; private static final String SAMPLE_LOG_URL = "https://we-are-data-router.us/feed/xyz/logs"; @@ -44,49 +49,54 @@ class DataRouterSinkParserTest { private static final String SAMPLE_PASSWORD = "some-password"; private static final String SAMPLE_PUBLISHER_ID = "123456"; - private static final Gson gson = new Gson(); - private final StreamFromGsonParser<DataRouterSink> streamParser = StreamFromGsonParsers.dataRouterSinkParser(); - private static final DataRouterSink fullConfigurationStream = ImmutableDataRouterSink.builder() - .location(SAMPLE_LOCATION) - .publishUrl(SAMPLE_PUBLISH_URL) - .logUrl(SAMPLE_LOG_URL) - .username(SAMPLE_USER) - .password(SAMPLE_PASSWORD) - .publisherId(SAMPLE_PUBLISHER_ID) - .build(); - - private static final DataRouterSink minimalConfigurationStream = ImmutableDataRouterSink.builder() - .publishUrl(SAMPLE_PUBLISH_URL) - .build(); - @Test void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/data_router_sink_full.json"); + // when DataRouterSink result = streamParser.unsafeParse(input); + // then + final DataRouterSink fullConfigurationStream = ImmutableDataRouterSink.builder() + .name(input.name()) + .location(SAMPLE_LOCATION) + .publishUrl(SAMPLE_PUBLISH_URL) + .logUrl(SAMPLE_LOG_URL) + .username(SAMPLE_USER) + .password(SAMPLE_PASSWORD) + .publisherId(SAMPLE_PUBLISHER_ID) + .build(); assertThat(result).isEqualTo(fullConfigurationStream); } @Test void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { //given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_minimal.json"); + RawDataStream<JsonObject> input = DataStreamUtils + .readSinkFromResource("/streams/data_router_sink_minimal.json"); + // when DataRouterSink result = streamParser.unsafeParse(input); + // then + final DataRouterSink minimalConfigurationStream = ImmutableDataRouterSink.builder() + .name(input.name()) + .publishUrl(SAMPLE_PUBLISH_URL) + .build(); assertThat(result).isEqualTo(minimalConfigurationStream); } @Test void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_full.json"); + // when Either<StreamParserError, DataRouterSink> result = streamParser.parse(input); + // then assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); result.peekLeft(error -> { @@ -100,9 +110,17 @@ class DataRouterSinkParserTest { @Test void emptyConfiguration_shouldParseToStreamParserError() { // given - JsonObject input = gson.fromJson("{}", JsonObject.class); + JsonObject json = new JsonObject(); + final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder() + .name("empty") + .type("data_router") + .descriptor(json) + .direction(DataStreamDirection.SINK) + .build(); + // when Either<StreamParserError, DataRouterSink> result = streamParser.parse(input); + // then assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); } diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java index 681fa147..b2d01309 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParserTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -28,7 +28,11 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.St import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*; import java.io.IOException; @@ -38,54 +42,60 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; public class DataRouterSourceParserTest { + private static final String SAMPLE_LOCATION = "mtc00"; private static final String SAMPLE_DELIVERY_URL = "https://my-subscriber-app.dcae:8080/target-path"; private static final String SAMPLE_USER = "some-user"; private static final String SAMPLE_PASSWORD = "some-password"; private static final String SAMPLE_SUBSCRIBER_ID = "789012"; - private static final Gson gson = new Gson(); - - private static final DataRouterSource fullConfigurationStream = ImmutableDataRouterSource.builder() - .location(SAMPLE_LOCATION) - .deliveryUrl(SAMPLE_DELIVERY_URL) - .username(SAMPLE_USER) - .password(SAMPLE_PASSWORD) - .subscriberId(SAMPLE_SUBSCRIBER_ID) - .build(); - - private static final DataRouterSource minimalConfigurationStream = ImmutableDataRouterSource.builder() - .build(); - - private final StreamFromGsonParser<DataRouterSource> streamParser = StreamFromGsonParsers.dataRouterSourceParser(); @Test void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_source_full.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/data_router_source_full.json"); + // when DataRouterSource result = streamParser.unsafeParse(input); + // then + + final DataRouterSource fullConfigurationStream = ImmutableDataRouterSource.builder() + .name(input.name()) + .location(SAMPLE_LOCATION) + .deliveryUrl(SAMPLE_DELIVERY_URL) + .username(SAMPLE_USER) + .password(SAMPLE_PASSWORD) + .subscriberId(SAMPLE_SUBSCRIBER_ID) + .build(); assertThat(result).isEqualTo(fullConfigurationStream); } @Test void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_source_minimal.json"); + RawDataStream<JsonObject> input = DataStreamUtils + .readSourceFromResource("/streams/data_router_source_minimal.json"); + // when DataRouterSource result = streamParser.unsafeParse(input); + // then + final DataRouterSource minimalConfigurationStream = ImmutableDataRouterSource.builder() + .name(input.name()) + .build(); assertThat(result).isEqualTo(minimalConfigurationStream); } @Test void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_full.json"); + // when Either<StreamParserError, DataRouterSource> result = streamParser.parse(input); + // then assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); result.peekLeft(error -> { @@ -99,9 +109,17 @@ public class DataRouterSourceParserTest { @Test void emptyConfiguration_shouldBeParsedToStreamParserError() { // given - JsonObject input = gson.fromJson("{}", JsonObject.class); + JsonObject json = new JsonObject(); + final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder() + .name("empty") + .type("data_router") + .descriptor(json) + .direction(DataStreamDirection.SOURCE) + .build(); + // when Either<StreamParserError, DataRouterSource> result = streamParser.parse(input); + // then assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); } diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java index 63b04d1d..4d3b88b8 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParserTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; import com.google.gson.Gson; import com.google.gson.JsonObject; @@ -26,6 +26,10 @@ import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; @@ -48,16 +52,16 @@ public class MessageRouterSinkParserTest { private static final String SAMPLE_CLIENT_ID = "1500462518108"; private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic"; - private static final Gson gson = new Gson(); - private final StreamFromGsonParser<MessageRouterSink> streamParser = StreamFromGsonParsers.messageRouterSinkParser(); @Test void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_full.json"); + // when MessageRouterSink result = streamParser.unsafeParse(input); + // then assertThat(result).isInstanceOf(MessageRouterSink.class); assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME); @@ -71,10 +75,11 @@ public class MessageRouterSinkParserTest { @Test void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_minimal.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_minimal.json"); // when MessageRouterSink result = streamParser.unsafeParse(input); + // then assertThat(result).isInstanceOf(MessageRouterSink.class); assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL); @@ -86,9 +91,11 @@ public class MessageRouterSinkParserTest { @Test void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/data_router_sink_full.json"); + // when Either<StreamParserError, MessageRouterSink> result = streamParser.parse(input); + // then assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); result.peekLeft(error -> { @@ -102,9 +109,17 @@ public class MessageRouterSinkParserTest { @Test void emptyConfiguration_shouldParseToStreamParserError() { // given - JsonObject input = gson.fromJson("{}", JsonObject.class); + JsonObject json = new JsonObject(); + final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder() + .name("empty") + .type("data_router") + .descriptor(json) + .direction(DataStreamDirection.SINK) + .build(); + // when Either<StreamParserError, MessageRouterSink> result = streamParser.parse(input); + // then assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); } diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java index fea63d66..d497817f 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParserTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; import com.google.gson.Gson; import com.google.gson.JsonObject; @@ -26,6 +26,10 @@ import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource; @@ -48,16 +52,16 @@ public class MessageRouterSourceParserTest { private static final String SAMPLE_CLIENT_ID = "1500462518108"; private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic"; - private static final Gson gson = new Gson(); - private final StreamFromGsonParser<MessageRouterSource> streamParser = StreamFromGsonParsers.messageRouterSourceParser(); @Test void fullConfiguration_shouldGenerateDataRouterSourceObject() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_full.json"); + // when MessageRouterSource result = streamParser.unsafeParse(input); + // then assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME); assertThat(result.aafCredentials().password()).isEqualTo(SAMPLE_AAF_PASSWORD); @@ -70,10 +74,11 @@ public class MessageRouterSourceParserTest { @Test void minimalConfiguration_shouldGenerateDataRouterSourceObject() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_minimal.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_minimal.json"); // when MessageRouterSource result = streamParser.unsafeParse(input); + // then assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL); assertThat(result.aafCredentials().username()).isNull(); @@ -84,9 +89,11 @@ public class MessageRouterSourceParserTest { @Test void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/data_router_sink_full.json"); + // when Either<StreamParserError, MessageRouterSource> result = streamParser.parse(input); + // then assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); result.peekLeft(error -> { @@ -100,7 +107,13 @@ public class MessageRouterSourceParserTest { @Test void emptyConfiguration_shouldParseToStreamParserError() { // given - JsonObject input = gson.fromJson("{}", JsonObject.class); + JsonObject json = new JsonObject(); + final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder() + .name("empty") + .type("data_router") + .descriptor(json) + .direction(DataStreamDirection.SOURCE) + .build(); // when Either<StreamParserError, MessageRouterSource> result = streamParser.parse(input); // then diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java index b5481203..5974639c 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java @@ -18,22 +18,21 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.in; -import static org.junit.jupiter.api.Assertions.*; import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import io.vavr.Function1; import io.vavr.control.Either; import java.io.IOException; -import java.io.InputStreamReader; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; /** @@ -52,7 +51,7 @@ class KafkaSinkParserTest { @Test void shouldParseMinimalKafkaSinkDefinition() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_minimal.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink_minimal.json"); // when final KafkaSink result = cut.unsafeParse(input); @@ -66,15 +65,19 @@ class KafkaSinkParserTest { } @Test - void shouldParseBasicKafkaSinkDefinition() throws IOException { + void shouldParseFullKafkaSinkDefinition() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink.json"); // when final KafkaSink result = cut.unsafeParse(input); // then - assertThat(result.aafCredentials()).isNull(); + final ImmutableAafCredentials expectedCredentials = ImmutableAafCredentials.builder() + .username("the user") + .password("the passwd") + .build(); + assertThat(result.aafCredentials()).isEqualTo(expectedCredentials); assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); assertThat(result.clientId()).isEqualTo("1500462518108"); @@ -84,7 +87,7 @@ class KafkaSinkParserTest { @Test void shouldReturnErrorWhenStructureIsWrong() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_missing_child.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink_missing_child.json"); // when final Either<StreamParserError, KafkaSink> result = cut.parse(input); @@ -99,7 +102,7 @@ class KafkaSinkParserTest { @Test void shouldReturnErrorWhenTypeIsWrong() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_invalid_type.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_invalid_type.json"); // when final Either<StreamParserError, KafkaSink> result = cut.parse(input); @@ -112,4 +115,19 @@ class KafkaSinkParserTest { assertThat(error.message()).containsIgnoringCase("message_router"); }); } + + @Test + void shouldReturnErrorWhenDirectionIsWrong() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_sink.json"); + + // when + final Either<StreamParserError, KafkaSink> result = cut.parse(input); + + // then + assertThat(result.isRight()).describedAs("should not be right").isFalse(); + result.peekLeft(error -> { + assertThat(error.message()).containsIgnoringCase("invalid stream direction"); + }); + } }
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java new file mode 100644 index 00000000..d255d99a --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java @@ -0,0 +1,120 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.gson.JsonObject; +import io.vavr.collection.List; +import io.vavr.control.Either; +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +class KafkaSourceParserTest { + + private final StreamFromGsonParser<KafkaSource> cut = StreamFromGsonParsers.kafkaSourceParser(); + + @Test + void precondition_assureInstanceOf() { + assertThat(cut).isInstanceOf(KafkaSourceParser.class); + } + + @Test + void shouldParseMinimalKafkaSourceDefinition() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_source_minimal.json"); + + // when + final KafkaSource result = cut.unsafeParse(input); + + // then + assertThat(result.aafCredentials()).isNull(); + assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); + assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); + assertThat(result.clientId()).isNull(); + assertThat(result.clientRole()).isNull(); + } + + @Test + void shouldParseFullKafkaSourceDefinition() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_source.json"); + + // when + final KafkaSource result = cut.unsafeParse(input); + + // then + final ImmutableAafCredentials expectedCredentials = ImmutableAafCredentials.builder() + .username("the user") + .password("the passwd") + .build(); + assertThat(result.aafCredentials()).isEqualTo(expectedCredentials); + assertThat(result.bootstrapServerList()).isEqualTo(List.of("dmaap-mr-kafka-0:6060", "dmaap-mr-kafka-1:6060")); + assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); + assertThat(result.consumerGroupId()).isEqualTo("nokia-perf3gpp-processor"); + assertThat(result.clientId()).isEqualTo("1500462518108"); + assertThat(result.clientRole()).isEqualTo("com.dcae.member"); + } + + @Test + void shouldReturnErrorWhenTypeIsWrong() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_invalid_type.json"); + + // when + final Either<StreamParserError, KafkaSource> result = cut.parse(input); + + // then + assertThat(result.isRight()).describedAs("should not be right").isFalse(); + result.peekLeft(error -> { + assertThat(error.message()).containsIgnoringCase("invalid stream type"); + assertThat(error.message()).containsIgnoringCase("kafka"); + assertThat(error.message()).containsIgnoringCase("message_router"); + }); + } + + @Test + void shouldReturnErrorWhenDirectionIsWrong() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_source.json"); + + // when + final Either<StreamParserError, KafkaSource> result = cut.parse(input); + + // then + assertThat(result.isRight()).describedAs("should not be right").isFalse(); + result.peekLeft(error -> { + assertThat(error.message()).containsIgnoringCase("invalid stream direction"); + }); + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/resources/sample_config.json b/rest-services/cbs-client/src/test/resources/sample_config.json index a95b723f..266326f4 100644 --- a/rest-services/cbs-client/src/test/resources/sample_config.json +++ b/rest-services/cbs-client/src/test/resources/sample_config.json @@ -1,3 +1,33 @@ { - "keystore.path": "/var/run/security/keystore.p12" + "keystore.path": "/var/run/security/keystore.p12", + "streams_publishes": { + "perf3gpp": { + "type": "kafka", + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka:6060", + "topic_name": "HVVES_PERF3GPP" + } + }, + "pnf_ready": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://message-router:3904/events/VES_PNF_READY" + } + }, + "call_trace": { + "type": "kafka", + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka:6060", + "topic_name": "HVVES_TRACE" + } + } + }, + "streams_subscribes": { + "measurements": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://message-router:3904/events/VES_MEASUREMENT" + } + } + } } diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json b/rest-services/cbs-client/src/test/resources/streams/kafka_invalid_type.json index 0ee88adb..0ee88adb 100644 --- a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_invalid_type.json diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json index b60388d5..e7b45508 100644 --- a/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json @@ -1,5 +1,9 @@ { "type": "kafka", + "aaf_credentials": { + "username": "the user", + "password": "the passwd" + }, "kafka_info": { "client_role": "com.dcae.member", "client_id": "1500462518108", diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_source.json b/rest-services/cbs-client/src/test/resources/streams/kafka_source.json new file mode 100644 index 00000000..379dbef1 --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_source.json @@ -0,0 +1,14 @@ +{ + "type": "kafka", + "aaf_credentials": { + "username": "the user", + "password": "the passwd" + }, + "kafka_info": { + "client_role": "com.dcae.member", + "client_id": "1500462518108", + "bootstrap_servers": "dmaap-mr-kafka-0:6060,,dmaap-mr-kafka-1:6060,", + "topic_name": "HVVES_PERF3GPP", + "consumer_group_id": "nokia-perf3gpp-processor" + } +} |