From c852448b1e6ca5e28e0b0dab26c2af3d5af3f390 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Thu, 14 Mar 2019 08:38:15 +0100 Subject: Implement Kafka stream definition parsers Change-Id: I43215c1c2494b6036deb004891fb76bfd2b74474 Issue-ID: DCAEGEN2-1341 Signed-off-by: Piotr Jaszczyk --- rest-services/cbs-client/pom.xml | 97 +++++++++-------- .../client/api/exceptions/StreamParserError.java | 9 ++ .../client/api/streams/StreamFromGsonParser.java | 37 +++++++ .../client/api/streams/StreamFromGsonParsers.java | 44 ++++++++ .../cbs/client/api/streams/StreamParser.java | 34 ++++-- .../cbs/client/impl/streams/gson/GsonKafka.java | 92 +++++++++++++++++ .../client/impl/streams/gson/GsonKafkaSink.java | 39 +++++++ .../client/impl/streams/gson/GsonKafkaSource.java | 45 ++++++++ .../cbs/client/impl/streams/gson/GsonUtils.java | 89 ++++++++++++++++ .../cbs/client/impl/streams/gson/KafkaInfo.java | 56 ++++++++++ .../client/impl/streams/gson/KafkaSinkParser.java | 58 +++++++++++ .../impl/streams/gson/KafkaSourceParser.java | 57 ++++++++++ .../cbs/client/model/streams/AafCredentials.java | 4 + .../cbs/client/model/streams/SinkStream.java | 2 +- .../cbs/client/model/streams/SourceStream.java | 2 +- .../cbs/client/model/streams/dmaap/Kafka.java | 49 +++++++++ .../cbs/client/model/streams/dmaap/KafkaSink.java | 35 +++++++ .../client/model/streams/dmaap/KafkaSource.java | 37 +++++++ .../impl/streams/gson/KafkaSinkParserTest.java | 115 +++++++++++++++++++++ .../impl/streams/gson/KafkaSourceParserTest.java | 60 +++++++++++ .../src/test/resources/streams/kafka_sink.json | 9 ++ .../resources/streams/kafka_sink_invalid_type.json | 7 ++ .../test/resources/streams/kafka_sink_minimal.json | 7 ++ .../streams/kafka_sink_missing_child.json | 3 + .../resources/streams/kafka_source_minimal.json | 7 ++ 25 files changed, 937 insertions(+), 57 deletions(-) create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java create mode 100644 rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java create mode 100644 rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java create mode 100644 rest-services/cbs-client/src/test/resources/streams/kafka_sink.json create mode 100644 rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json create mode 100644 rest-services/cbs-client/src/test/resources/streams/kafka_sink_minimal.json create mode 100644 rest-services/cbs-client/src/test/resources/streams/kafka_sink_missing_child.json create mode 100644 rest-services/cbs-client/src/test/resources/streams/kafka_source_minimal.json (limited to 'rest-services') diff --git a/rest-services/cbs-client/pom.xml b/rest-services/cbs-client/pom.xml index 70c11a1c..9544a7fe 100644 --- a/rest-services/cbs-client/pom.xml +++ b/rest-services/cbs-client/pom.xml @@ -1,58 +1,57 @@ - 4.0.0 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 - - org.onap.dcaegen2.services.sdk - dcaegen2-services-sdk-rest-services - 1.1.4-SNAPSHOT - + + org.onap.dcaegen2.services.sdk + dcaegen2-services-sdk-rest-services + 1.1.4-SNAPSHOT + - org.onap.dcaegen2.services.sdk.rest.services - cbs-client + org.onap.dcaegen2.services.sdk.rest.services + cbs-client - dcaegen2-services-sdk-rest-services-cbs-client - Config Binding Service Rest Services Module - jar + dcaegen2-services-sdk-rest-services-cbs-client + Config Binding Service Rest Services Module + jar - - - org.onap.dcaegen2.services.sdk.rest.services - common-dependency - ${project.version} - - - io.vavr - vavr - - - - org.jetbrains - annotations - + + + org.onap.dcaegen2.services.sdk.rest.services + common-dependency + ${project.version} + + + io.vavr + vavr + + + org.jetbrains + annotations + - - org.mockito - mockito-core - test - - - org.junit.jupiter - junit-jupiter-engine - test - - - org.assertj - assertj-core - test - - - io.projectreactor - reactor-test - test - - + + org.mockito + mockito-core + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.assertj + assertj-core + test + + + io.projectreactor + reactor-test + test + + diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java index 214f0e4a..cbdea005 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions; +import io.vavr.control.Either; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; /** @@ -30,6 +31,14 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; public class StreamParserError { private final String message; + public static Either left(Throwable ex) { + return Either.left(fromThrowable(ex)); + } + + public static StreamParserError fromThrowable(Throwable ex) { + return new StreamParserError(ex.getMessage()); + } + public StreamParserError(String message) { this.message = message; } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java new file mode 100644 index 00000000..f18f2175 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; + +import com.google.gson.JsonObject; +import io.vavr.control.Either; +import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; + +/** + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +@ExperimentalApi +public interface StreamFromGsonParser extends StreamParser { + +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java new file mode 100644 index 00000000..4b0223fd --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java @@ -0,0 +1,44 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; + +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.KafkaSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.KafkaSourceParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +public final class StreamFromGsonParsers { + + private StreamFromGsonParsers() { + } + + public static StreamFromGsonParser kafkaSinkParser() { + return KafkaSinkParser.create(); + } + + public static StreamFromGsonParser kafkaSourceParser() { + return KafkaSourceParser.create(); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java index 9ba7047a..3467c809 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java @@ -21,22 +21,44 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; import io.vavr.control.Either; +import io.vavr.control.Try; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.MerkleTree; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; /** + * A generic data stream parser which parses {@code T} to data stream {@code S}. + * * @author Piotr Jaszczyk - * @since 1.1.2 + * @param input data type, eg. Gson Object + * @param output data type + * @since 1.1.3 */ @ExperimentalApi -public interface StreamParser { +public interface StreamParser { - Either parse(MerkleTree subtree); + /** + * Parse the input data {@code T} producing the {@link DataStream}. + * + * @param input - the input data + * @return Right(parsing result) or Left(parsing error) + */ + default Either parse(T input) { + return Try.of(() -> unsafeParse(input)) + .toEither() + .mapLeft(StreamParserError::fromThrowable); + } - default S unsafeParse(MerkleTree subtree) { - return parse(subtree).getOrElseThrow(StreamParsingException::new); + /** + * Parse the input data {@code T} producing the {@link DataStream}. Will throw StreamParsingException when input + * was invalid. + * + * @param input - the input data + * @return parsing result + * @throws StreamParsingException when parsing was unsuccessful + */ + default S unsafeParse(T input) { + return parse(input).getOrElseThrow(StreamParsingException::new); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java new file mode 100644 index 00000000..ecafd30c --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java @@ -0,0 +1,92 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import java.util.Objects; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +abstract class GsonKafka implements Kafka { + + protected final KafkaInfo kafkaInfo; + private final AafCredentials aafCredentials; + + GsonKafka(@NotNull KafkaInfo kafkaInfo, + @Nullable AafCredentials aafCredentials) { + this.kafkaInfo = Objects.requireNonNull(kafkaInfo, "kafkaInfo"); + this.aafCredentials = aafCredentials; + } + + @Override + public String bootstrapServers() { + return kafkaInfo.bootstrapServers(); + } + + @Override + public String topicName() { + return kafkaInfo.topicName(); + } + + @Override + public @Nullable AafCredentials aafCredentials() { + return aafCredentials; + } + + @Override + public @Nullable String clientRole() { + return kafkaInfo.clientRole(); + } + + @Override + public @Nullable String clientId() { + return kafkaInfo.clientId(); + } + + @Override + public int maxPayloadSizeBytes() { + return kafkaInfo.maxPayloadSizeBytes(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GsonKafka gsonKafka = (GsonKafka) o; + return kafkaInfo.equals(gsonKafka.kafkaInfo) && + Objects.equals(aafCredentials, gsonKafka.aafCredentials); + } + + @Override + public int hashCode() { + return Objects.hash(kafkaInfo, aafCredentials); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java new file mode 100644 index 00000000..c45f8470 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +class GsonKafkaSink extends GsonKafka implements KafkaSink { + + GsonKafkaSink( + @NotNull KafkaInfo kafkaInfo, + @Nullable AafCredentials aafCredentials) { + super(kafkaInfo, aafCredentials); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java new file mode 100644 index 00000000..1509d9d7 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java @@ -0,0 +1,45 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +class GsonKafkaSource extends GsonKafka implements KafkaSource { + + GsonKafkaSource( + @NotNull KafkaInfo kafkaInfo, + @Nullable AafCredentials aafCredentials) { + super(kafkaInfo, aafCredentials); + } + + @Override + public @Nullable String consumerGroupId() { + return kafkaInfo.consumerGroupId(); + } + +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java new file mode 100644 index 00000000..a813607e --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java @@ -0,0 +1,89 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.vavr.Lazy; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +final class GsonUtils { + private static final Lazy GSON = Lazy.of(() -> { + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersKafkaInfo()); + gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersAafCredentials()); + return gsonBuilder.create(); + }); + + private GsonUtils() { + } + + static Gson gsonInstance() { + return GSON.get(); + } + + static void assertStreamType(JsonObject json, String expectedType) { + final String actualType = requiredString(json, "type"); + if (!actualType.equals(expectedType)) { + throw new IllegalArgumentException("Invalid stream type. Expected '" + expectedType + "', but was '" + actualType + "'"); + } + } + + static String requiredString(JsonObject parent, String childName) { + return requiredChild(parent, childName).getAsString(); + } + + static JsonElement requiredChild(JsonObject parent, String childName) { + if (parent.has(childName)) { + return parent.get(childName); + } else { + throw new IllegalArgumentException( + "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + stringifyChildrenNames(parent)); + } + } + + static JsonObject readObjectFromResource(String resource) throws IOException { + return readFromResource(resource).getAsJsonObject(); + } + + static JsonElement readFromResource(String resource) throws IOException { + try (Reader reader = new InputStreamReader(GsonUtils.class.getResourceAsStream(resource))) { + return new JsonParser().parse(reader); + } + } + + private static String stringifyChildrenNames(JsonObject parent) { + return parent.entrySet().stream().map(Entry::getKey).collect(Collectors.joining(", ")); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java new file mode 100644 index 00000000..8b17a8d4 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import com.google.gson.annotations.SerializedName; +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +@Value.Immutable +@Gson.TypeAdapters +public interface KafkaInfo { + + @SerializedName("bootstrap_servers") + String bootstrapServers(); + + @SerializedName("topic_name") + String topicName(); + + @SerializedName("consumer_group_id") + @Nullable String consumerGroupId(); + + @SerializedName("client_role") + @Nullable String clientRole(); + + @SerializedName("client_id") + @Nullable String clientId(); + + @Value.Default + @SerializedName("max_payload_size_bytes") + default int maxPayloadSizeBytes() { + return 1024 * 1024; + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java new file mode 100644 index 00000000..393fe40f --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java @@ -0,0 +1,58 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; + +/** + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +public class KafkaSinkParser implements StreamFromGsonParser { + + private final Gson gson; + + public static KafkaSinkParser create() { + return new KafkaSinkParser(gsonInstance()); + } + + KafkaSinkParser(Gson gson) { + this.gson = gson; + } + + @Override + public KafkaSink unsafeParse(JsonObject input) { + assertStreamType(input, "kafka"); + + final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info"); + final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class); + + return new GsonKafkaSink(kafkaInfo, null); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java new file mode 100644 index 00000000..8b48a2a4 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java @@ -0,0 +1,57 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; + +/** + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +public class KafkaSourceParser implements StreamFromGsonParser { + private final Gson gson; + + public static KafkaSourceParser create() { + return new KafkaSourceParser(gsonInstance()); + } + + KafkaSourceParser(Gson gson) { + this.gson = gson; + } + + @Override + public KafkaSource unsafeParse(JsonObject input) { + assertStreamType(input, "kafka"); + + final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info"); + final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class); + + return new GsonKafkaSource(kafkaInfo, null); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java index 11481561..ecb0b553 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java @@ -21,6 +21,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; +import org.immutables.gson.Gson; +import org.immutables.value.Value; import org.jetbrains.annotations.Nullable; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; @@ -29,6 +31,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; * @version 1.2.1 */ @ExperimentalApi +@Value.Immutable +@Gson.TypeAdapters public interface AafCredentials { @Nullable String username(); diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java index 3ccce215..e3389207 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java @@ -29,6 +29,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; * @version 1.2.1 */ @ExperimentalApi -public interface SinkStream { +public interface SinkStream extends DataStream { } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java index 78bb5b51..2bea143b 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java @@ -29,6 +29,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; * @version 1.2.1 */ @ExperimentalApi -public interface SourceStream { +public interface SourceStream extends DataStream { } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java new file mode 100644 index 00000000..97f07a29 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java @@ -0,0 +1,49 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; + +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; + +/** + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +@ExperimentalApi +public interface Kafka { + + String bootstrapServers(); + + String topicName(); + + @Nullable AafCredentials aafCredentials(); + + @Nullable String clientRole(); + + @Nullable String clientId(); + + @Value.Default + default int maxPayloadSizeBytes() { + return 1024 * 1024; + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java new file mode 100644 index 00000000..514881fe --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; + +import org.immutables.value.Value; +import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream; + +/** + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +@ExperimentalApi +@Value.Immutable +public abstract interface KafkaSink extends Kafka, SinkStream { + +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java new file mode 100644 index 00000000..65280a98 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; + +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream; + +/** + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +@ExperimentalApi +@Value.Immutable +public interface KafkaSource extends Kafka, SourceStream { + + @Nullable String consumerGroupId(); +} diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java new file mode 100644 index 00000000..b5481203 --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java @@ -0,0 +1,115 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.in; +import static org.junit.jupiter.api.Assertions.*; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.vavr.Function1; +import io.vavr.control.Either; +import java.io.IOException; +import java.io.InputStreamReader; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +class KafkaSinkParserTest { + + private final StreamFromGsonParser cut = StreamFromGsonParsers.kafkaSinkParser(); + + @Test + void precondition_assureInstanceOf() { + assertThat(cut).isInstanceOf(KafkaSinkParser.class); + } + + @Test + void shouldParseMinimalKafkaSinkDefinition() throws IOException { + // given + JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_minimal.json"); + + // when + final KafkaSink result = cut.unsafeParse(input); + + // then + assertThat(result.aafCredentials()).isNull(); + assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); + assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); + assertThat(result.clientId()).isNull(); + assertThat(result.clientRole()).isNull(); + } + + @Test + void shouldParseBasicKafkaSinkDefinition() throws IOException { + // given + JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink.json"); + + // when + final KafkaSink result = cut.unsafeParse(input); + + // then + assertThat(result.aafCredentials()).isNull(); + assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); + assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); + assertThat(result.clientId()).isEqualTo("1500462518108"); + assertThat(result.clientRole()).isEqualTo("com.dcae.member"); + } + + @Test + void shouldReturnErrorWhenStructureIsWrong() throws IOException { + // given + JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_missing_child.json"); + + // when + final Either result = cut.parse(input); + + // then + assertThat(result.isRight()).describedAs("should not be right").isFalse(); + result.peekLeft(error -> { + assertThat(error.message()).contains("kafka_info"); + }); + } + + @Test + void shouldReturnErrorWhenTypeIsWrong() throws IOException { + // given + JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_invalid_type.json"); + + // when + final Either result = cut.parse(input); + + // then + assertThat(result.isRight()).describedAs("should not be right").isFalse(); + result.peekLeft(error -> { + assertThat(error.message()).containsIgnoringCase("invalid stream type"); + assertThat(error.message()).containsIgnoringCase("kafka"); + assertThat(error.message()).containsIgnoringCase("message_router"); + }); + } +} \ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java new file mode 100644 index 00000000..87131285 --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java @@ -0,0 +1,60 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.gson.JsonObject; +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +class KafkaSourceParserTest { + + private final StreamFromGsonParser cut = StreamFromGsonParsers.kafkaSourceParser(); + + @Test + void precondition_assureInstanceOf() { + assertThat(cut).isInstanceOf(KafkaSourceParser.class); + } + + @Test + void shouldParseMinimalKafkaSourceDefinition() throws IOException { + // given + JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_source_minimal.json"); + + // when + final KafkaSource result = cut.unsafeParse(input); + + // then + assertThat(result.aafCredentials()).isNull(); + assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); + assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); + assertThat(result.clientId()).isNull(); + assertThat(result.clientRole()).isNull(); + } +} \ No newline at end of file diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json new file mode 100644 index 00000000..b60388d5 --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json @@ -0,0 +1,9 @@ +{ + "type": "kafka", + "kafka_info": { + "client_role": "com.dcae.member", + "client_id": "1500462518108", + "bootstrap_servers": "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060", + "topic_name": "HVVES_PERF3GPP" + } +} diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json new file mode 100644 index 00000000..0ee88adb --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json @@ -0,0 +1,7 @@ +{ + "type": "message_router", + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060", + "topic_name": "HVVES_PERF3GPP" + } +} diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_minimal.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink_minimal.json new file mode 100644 index 00000000..da8dd4fe --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_sink_minimal.json @@ -0,0 +1,7 @@ +{ + "type": "kafka", + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060", + "topic_name": "HVVES_PERF3GPP" + } +} diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_missing_child.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink_missing_child.json new file mode 100644 index 00000000..d2591b3a --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_sink_missing_child.json @@ -0,0 +1,3 @@ +{ + "type": "kafka" +} diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_source_minimal.json b/rest-services/cbs-client/src/test/resources/streams/kafka_source_minimal.json new file mode 100644 index 00000000..da8dd4fe --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_source_minimal.json @@ -0,0 +1,7 @@ +{ + "type": "kafka", + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060", + "topic_name": "HVVES_PERF3GPP" + } +} -- cgit 1.2.3-korg