diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-03-14 14:34:35 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-03-19 15:06:15 +0100 |
commit | 565ec734f46a15bb3c87fe02a32613fc86c0eb22 (patch) | |
tree | 71412d433e9e952db2e2f9db6044dc1a9643e9c0 /rest-services/cbs-client/src/main/java/org | |
parent | 9b03823dc6ac4bec2e61a4e54d114a518dbb1100 (diff) |
Kafka streams parsers - additions
Change-Id: I98ca661682b41d76d3de668d6faeb6ebe02f92a8
Issue-ID: DCAEGEN2-1341
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'rest-services/cbs-client/src/main/java/org')
30 files changed, 436 insertions, 119 deletions
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java index 36589dad..989bd2db 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java @@ -20,9 +20,9 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsClientImpl; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsLookup; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; import reactor.core.publisher.Mono; diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java index 15c4eea2..dfd0e2f7 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java @@ -19,17 +19,16 @@ */ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; +import static java.lang.String.valueOf; + import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.vavr.collection.List; -import org.jetbrains.annotations.NotNull; - import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; - -import static java.lang.String.valueOf; +import org.jetbrains.annotations.NotNull; /** diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java new file mode 100644 index 00000000..4fdb31b1 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java @@ -0,0 +1,57 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.vavr.collection.Stream; +import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +@ExperimentalApi +public final class DataStreams { + + private DataStreams() { + } + + public static Stream<RawDataStream<JsonObject>> namedSources(JsonObject rootJson) { + return createCollectionOfStreams(rootJson, DataStreamDirection.SOURCE); + } + + public static Stream<RawDataStream<JsonObject>> namedSinks(JsonObject rootJson) { + return createCollectionOfStreams(rootJson, DataStreamDirection.SINK); + } + + private static Stream<RawDataStream<JsonObject>> createCollectionOfStreams(JsonObject rootJson, DataStreamDirection direction) { + final JsonElement streamsJson = rootJson.get(direction.configurationKey()); + return streamsJson == null + ? Stream.empty() + : DataStreamUtils.mapJsonToStreams(streamsJson, direction); + } + + +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java index f18f2175..460d7100 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java @@ -21,10 +21,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; import com.google.gson.JsonObject; -import io.vavr.control.Either; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; /** diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java index 9d703bb3..7ae92baf 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java @@ -20,12 +20,17 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.*; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr.DataRouterSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr.DataRouterSourceParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSourceParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @since March 2019 + * @since 1.1.4 */ public final class StreamFromGsonParsers { diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java index 3467c809..69016ed8 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java @@ -26,6 +26,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; /** * A generic data stream parser which parses {@code T} to data stream {@code S}. @@ -33,7 +34,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Dat * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @param <T> input data type, eg. Gson Object * @param <S> output data type - * @since 1.1.3 + * @since 1.1.4 */ @ExperimentalApi public interface StreamParser<T, S extends DataStream> { @@ -44,7 +45,7 @@ public interface StreamParser<T, S extends DataStream> { * @param input - the input data * @return Right(parsing result) or Left(parsing error) */ - default Either<StreamParserError, S> parse(T input) { + default Either<StreamParserError, S> parse(RawDataStream<T> input) { return Try.of(() -> unsafeParse(input)) .toEither() .mapLeft(StreamParserError::fromThrowable); @@ -58,7 +59,7 @@ public interface StreamParser<T, S extends DataStream> { * @return parsing result * @throws StreamParsingException when parsing was unsuccessful */ - default S unsafeParse(T input) { + default S unsafeParse(RawDataStream<T> input) { return parse(input).getOrElseThrow(StreamParsingException::new); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java index 05bfc9be..9be08e3c 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java @@ -24,9 +24,9 @@ import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URL; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import reactor.core.publisher.Mono; public class CbsClientImpl implements CbsClient { diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java index 53d0bd34..89daebc8 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java @@ -22,12 +22,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; import com.google.gson.JsonArray; import com.google.gson.JsonObject; - import java.net.InetSocketAddress; - +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import reactor.core.publisher.Mono; /** diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java new file mode 100644 index 00000000..d34b1440 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.vavr.collection.Stream; +import java.io.IOException; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +public final class DataStreamUtils { + + public static Stream<RawDataStream<JsonObject>> mapJsonToStreams(JsonElement streamsJson, + DataStreamDirection direction) { + return Stream.ofAll(streamsJson.getAsJsonObject().entrySet()) + .map(namedSinkJson -> { + final JsonObject jsonObject = namedSinkJson.getValue().getAsJsonObject(); + return rawDataStream(namedSinkJson.getKey(), direction, jsonObject); + }); + } + + public static void assertStreamType( + RawDataStream<JsonObject> json, + String expectedType, + DataStreamDirection expectedDirection) { + if (!json.type().equals(expectedType)) { + throw new IllegalArgumentException( + "Invalid stream type. Expected '" + expectedType + "', but was '" + json.type() + "'"); + } + if (json.direction() != expectedDirection) { + throw new IllegalArgumentException( + "Invalid stream direction. Expected '" + expectedDirection + "', but was '" + json.direction() + + "'"); + } + } + + public static RawDataStream<JsonObject> readSourceFromResource(String resource) throws IOException { + return rawDataStream(resource, DataStreamDirection.SOURCE, GsonUtils.readObjectFromResource(resource)); + } + + public static RawDataStream<JsonObject> readSinkFromResource(String resource) throws IOException { + return rawDataStream(resource, DataStreamDirection.SINK, GsonUtils.readObjectFromResource(resource)); + } + + private static RawDataStream<JsonObject> rawDataStream(String name, DataStreamDirection direction, JsonObject json) { + return ImmutableRawDataStream.<JsonObject>builder() + .name(name) + .direction(direction) + .type(GsonUtils.requiredString(json, "type")) + .descriptor(json) + .build(); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java index a0880165..0b662286 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java @@ -26,14 +26,14 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import io.vavr.Lazy; - +import io.vavr.control.Option; import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; import java.util.Map.Entry; import java.util.stream.Collectors; - -import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.GsonAdaptersMessageRouterDmaapInfo; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.GsonAdaptersKafkaInfo; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSource; @@ -42,7 +42,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since March 2019 */ -final class GsonUtils { +public final class GsonUtils { + private static final Lazy<Gson> GSON = Lazy.of(() -> { GsonBuilder gsonBuilder = new GsonBuilder(); gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersKafkaInfo()); @@ -56,39 +57,35 @@ final class GsonUtils { private GsonUtils() { } - static Gson gsonInstance() { + public static Gson gsonInstance() { return GSON.get(); } - static void assertStreamType(JsonObject json, String expectedType) { - final String actualType = requiredString(json, "type"); - if (!actualType.equals(expectedType)) { - throw new IllegalArgumentException("Invalid stream type. Expected '" + expectedType + "', but was '" + actualType + "'"); - } - } - - static String requiredString(JsonObject parent, String childName) { + public static String requiredString(JsonObject parent, String childName) { return requiredChild(parent, childName).getAsString(); } - static Option<String> optionalString(JsonObject parent, String childName) { + public static Option<String> optionalString(JsonObject parent, String childName) { return Option.of(parent.get(childName).getAsString()); } - static JsonElement requiredChild(JsonObject parent, String childName) { - if (parent.has(childName)) { - return parent.get(childName); - } else { - throw new IllegalArgumentException( - "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + stringifyChildrenNames(parent)); - } + public static JsonElement requiredChild(JsonObject parent, String childName) { + return optionalChild(parent, childName) + .getOrElseThrow(() -> new IllegalArgumentException( + "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + + stringifyChildrenNames(parent))); + + } + + public static Option<JsonElement> optionalChild(JsonObject parent, String childName) { + return Option.of(parent.get(childName)); } - static JsonObject readObjectFromResource(String resource) throws IOException { + public static JsonObject readObjectFromResource(String resource) throws IOException { return readFromResource(resource).getAsJsonObject(); } - static JsonElement readFromResource(String resource) throws IOException { + public static JsonElement readFromResource(String resource) throws IOException { try (Reader reader = new InputStreamReader(GsonUtils.class.getResourceAsStream(resource))) { return new JsonParser().parse(reader); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java index 468b6180..42b86aee 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java @@ -17,19 +17,23 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; - /** * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a> */ @@ -45,12 +49,12 @@ public final class DataRouterSinkParser implements StreamFromGsonParser<DataRout this.gson = gson; } - public DataRouterSink unsafeParse(JsonObject input) { - assertStreamType(input, DATA_ROUTER_TYPE); - - final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME); + @Override + public DataRouterSink unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SINK); - return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class); + final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME); + return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class).withName(input.name()); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java index d78c3dde..7d29a90f 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java @@ -17,19 +17,23 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSource; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSource; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; - /** * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a> */ @@ -45,12 +49,12 @@ public final class DataRouterSourceParser implements StreamFromGsonParser<DataRo this.gson = gson; } - public DataRouterSource unsafeParse(JsonObject input) { - assertStreamType(input, DATA_ROUTER_TYPE); - - final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME); + @Override + public DataRouterSource unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SOURCE); - return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class); + final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME); + return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class).withName(input.name()); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java index 10c00e72..c5d254f0 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -31,15 +31,21 @@ import java.util.Objects; */ abstract class GsonMessageRouter implements MessageRouter { + private final String name; private final MessageRouterDmaapInfo dmaapInfo; private final AafCredentials aafCredentials; - GsonMessageRouter(@NotNull MessageRouterDmaapInfo dmaapInfo, - @Nullable AafCredentials aafCredentials) { + GsonMessageRouter(String name, @NotNull MessageRouterDmaapInfo dmaapInfo, + @Nullable AafCredentials aafCredentials) { + this.name = name; this.dmaapInfo = Objects.requireNonNull(dmaapInfo, "dmaapInfo"); this.aafCredentials = aafCredentials; } + public String name() { + return name; + } + @Override public String topicUrl() { return dmaapInfo.topicUrl(); diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java index da218420..5eef48d9 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -30,8 +30,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma public class GsonMessageRouterSink extends GsonMessageRouter implements MessageRouterSink { GsonMessageRouterSink( - @NotNull MessageRouterDmaapInfo dmaapInfo, + String name, @NotNull MessageRouterDmaapInfo dmaapInfo, @Nullable AafCredentials aafCredentials) { - super(dmaapInfo, aafCredentials); + super(name, dmaapInfo, aafCredentials); } }
\ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java index b69c53db..d93a1d50 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -30,8 +30,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma public class GsonMessageRouterSource extends GsonMessageRouter implements MessageRouterSource { GsonMessageRouterSource( - @NotNull MessageRouterDmaapInfo dmaapInfo, + String name, @NotNull MessageRouterDmaapInfo dmaapInfo, @Nullable AafCredentials aafCredentials) { - super(dmaapInfo, aafCredentials); + super(name, dmaapInfo, aafCredentials); } }
\ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java index ced5ad55..0ce0f80e 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; import com.google.gson.annotations.SerializedName; import org.immutables.gson.Gson; diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java index 56f53932..1f518fe8 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java @@ -17,20 +17,24 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; - public final class MessageRouterSinkParser implements StreamFromGsonParser<MessageRouterSink> { private final Gson gson; @@ -43,15 +47,16 @@ public final class MessageRouterSinkParser implements StreamFromGsonParser<Messa this.gson = gson; } - public MessageRouterSink unsafeParse(JsonObject input) { - assertStreamType(input, MESSAGE_ROUTER_TYPE); + @Override + public MessageRouterSink unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SINK); - final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class); + final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class); - final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME); + final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME); final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class); - return new GsonMessageRouterSink(dmaapInfo, aafCredentials); + return new GsonMessageRouterSink(input.name(), dmaapInfo, aafCredentials); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java index 25cf9e0e..c6c1b22c 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java @@ -17,20 +17,24 @@ * limitations under the License. * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE; - /** * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a> */ @@ -46,15 +50,16 @@ public final class MessageRouterSourceParser implements StreamFromGsonParser<Mes this.gson = gson; } - public MessageRouterSource unsafeParse(JsonObject input) { - assertStreamType(input, MESSAGE_ROUTER_TYPE); + @Override + public MessageRouterSource unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SOURCE); - final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class); + final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class); - final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME); + final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME); final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class); - return new GsonMessageRouterSource(dmaapInfo, aafCredentials); + return new GsonMessageRouterSource(input.name(), dmaapInfo, aafCredentials); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java index 2ad37a84..ad9b021e 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java @@ -18,7 +18,7 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; import java.util.Objects; import org.jetbrains.annotations.NotNull; @@ -32,15 +32,23 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma */ abstract class GsonKafka implements Kafka { - protected final KafkaInfo kafkaInfo; + private final String name; + final KafkaInfo kafkaInfo; private final AafCredentials aafCredentials; - GsonKafka(@NotNull KafkaInfo kafkaInfo, + GsonKafka( + @NotNull String name, + @NotNull KafkaInfo kafkaInfo, @Nullable AafCredentials aafCredentials) { + this.name = Objects.requireNonNull(name, "name"); this.kafkaInfo = Objects.requireNonNull(kafkaInfo, "kafkaInfo"); this.aafCredentials = aafCredentials; } + public String name() { + return name; + } + @Override public String bootstrapServers() { return kafkaInfo.bootstrapServers(); diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java index c45f8470..4990f80a 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java @@ -18,7 +18,7 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -32,8 +32,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma class GsonKafkaSink extends GsonKafka implements KafkaSink { GsonKafkaSink( + @NotNull String name, @NotNull KafkaInfo kafkaInfo, @Nullable AafCredentials aafCredentials) { - super(kafkaInfo, aafCredentials); + super(name, kafkaInfo, aafCredentials); } + } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java index 1509d9d7..137964c3 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java @@ -18,7 +18,7 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -32,9 +32,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma class GsonKafkaSource extends GsonKafka implements KafkaSource { GsonKafkaSource( + @NotNull String name, @NotNull KafkaInfo kafkaInfo, @Nullable AafCredentials aafCredentials) { - super(kafkaInfo, aafCredentials); + super(name, kafkaInfo, aafCredentials); } @Override diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java index 8b17a8d4..fd5602e6 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java @@ -18,7 +18,7 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; import com.google.gson.annotations.SerializedName; import org.immutables.gson.Gson; diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java index f9a546c2..59373c45 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java @@ -18,16 +18,21 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractAafCredentials; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractKafkaInfo; import com.google.gson.Gson; -import com.google.gson.JsonElement; import com.google.gson.JsonObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_INFO_CHILD_NAME; import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE; /** @@ -46,12 +51,13 @@ public final class KafkaSinkParser implements StreamFromGsonParser<KafkaSink> { } @Override - public KafkaSink unsafeParse(JsonObject input) { - assertStreamType(input, KAFKA_TYPE); + public KafkaSink unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SINK); + final JsonObject json = input.descriptor(); - final JsonElement kafkaInfoJson = requiredChild(input, KAFKA_INFO_CHILD_NAME); - final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class); + final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json); + final AafCredentials aafCreds = extractAafCredentials(gson, json).getOrNull(); - return new GsonKafkaSink(kafkaInfo, null); + return new GsonKafkaSink(input.name(), kafkaInfo, aafCreds); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java index 08c02b47..6ac1dc99 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java @@ -18,16 +18,21 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractAafCredentials; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractKafkaInfo; import com.google.gson.Gson; -import com.google.gson.JsonElement; import com.google.gson.JsonObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_INFO_CHILD_NAME; import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE; /** @@ -46,12 +51,13 @@ public final class KafkaSourceParser implements StreamFromGsonParser<KafkaSource } @Override - public KafkaSource unsafeParse(JsonObject input) { - assertStreamType(input, KAFKA_TYPE); + public KafkaSource unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SOURCE); + final JsonObject json = input.descriptor(); - final JsonElement kafkaInfoJson = requiredChild(input, KAFKA_INFO_CHILD_NAME); - final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class); + final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json); + final AafCredentials aafCreds = extractAafCredentials(gson, json).getOrNull(); - return new GsonKafkaSource(kafkaInfo, null); + return new GsonKafkaSource(input.name(), kafkaInfo, aafCreds); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java new file mode 100644 index 00000000..4cfa33ac --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.optionalChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +final class KafkaUtils { + + private KafkaUtils() { + } + + static KafkaInfo extractKafkaInfo(Gson gson, JsonObject input) { + final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info"); + return gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class); + } + + static Option<AafCredentials> extractAafCredentials(Gson gson, JsonObject input) { + return optionalChild(input, "aaf_credentials") + .map(aafCredsJson -> gson.fromJson(aafCredsJson, ImmutableAafCredentials.class)); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java index e8d63192..c3c70b78 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java @@ -36,9 +36,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; @Gson.TypeAdapters public interface AafCredentials { - @SerializedName("aaf_username") + @SerializedName(value = "username", alternate = "aaf_username") @Nullable String username(); - @SerializedName("aaf_password") + @SerializedName(value = "password", alternate = "aaf_password") @Nullable String password(); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java index 43d9d726..37bf7e57 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; +import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; /** @@ -28,5 +29,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; */ @ExperimentalApi public interface DataStream { - + @Value.Default + default String name() { + return ""; + } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java new file mode 100644 index 00000000..f3cac547 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +public enum DataStreamDirection { + + SINK("streams_publishes"), + SOURCE("streams_subscribes"); + + private final String configurationKey; + + DataStreamDirection(String configurationKey) { + this.configurationKey = configurationKey; + } + + public String configurationKey() { + return configurationKey; + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java new file mode 100644 index 00000000..7a39ede5 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; + +import org.immutables.value.Value; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +@Value.Immutable +public interface RawDataStream<T> { + String name(); + String type(); + DataStreamDirection direction(); + T descriptor(); +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java index 97f07a29..1810fc6c 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java @@ -20,6 +20,9 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; +import static io.vavr.Predicates.not; + +import io.vavr.collection.List; import org.immutables.value.Value; import org.jetbrains.annotations.Nullable; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; @@ -46,4 +49,9 @@ public interface Kafka { default int maxPayloadSizeBytes() { return 1024 * 1024; } + + @Value.Derived + default List<String> bootstrapServerList() { + return List.of(bootstrapServers().split(",")).filter(not(String::isEmpty)); + } } |