From c852448b1e6ca5e28e0b0dab26c2af3d5af3f390 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Thu, 14 Mar 2019 08:38:15 +0100 Subject: Implement Kafka stream definition parsers Change-Id: I43215c1c2494b6036deb004891fb76bfd2b74474 Issue-ID: DCAEGEN2-1341 Signed-off-by: Piotr Jaszczyk --- .../client/api/exceptions/StreamParserError.java | 9 +++ .../client/api/streams/StreamFromGsonParser.java | 37 +++++++++ .../client/api/streams/StreamFromGsonParsers.java | 44 +++++++++++ .../cbs/client/api/streams/StreamParser.java | 34 ++++++-- .../cbs/client/impl/streams/gson/GsonKafka.java | 92 ++++++++++++++++++++++ .../client/impl/streams/gson/GsonKafkaSink.java | 39 +++++++++ .../client/impl/streams/gson/GsonKafkaSource.java | 45 +++++++++++ .../cbs/client/impl/streams/gson/GsonUtils.java | 89 +++++++++++++++++++++ .../cbs/client/impl/streams/gson/KafkaInfo.java | 56 +++++++++++++ .../client/impl/streams/gson/KafkaSinkParser.java | 58 ++++++++++++++ .../impl/streams/gson/KafkaSourceParser.java | 57 ++++++++++++++ .../cbs/client/model/streams/AafCredentials.java | 4 + .../cbs/client/model/streams/SinkStream.java | 2 +- .../cbs/client/model/streams/SourceStream.java | 2 +- .../cbs/client/model/streams/dmaap/Kafka.java | 49 ++++++++++++ .../cbs/client/model/streams/dmaap/KafkaSink.java | 35 ++++++++ .../client/model/streams/dmaap/KafkaSource.java | 37 +++++++++ 17 files changed, 681 insertions(+), 8 deletions(-) create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java (limited to 'rest-services/cbs-client/src/main/java/org') diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java index 214f0e4a..cbdea005 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions; +import io.vavr.control.Either; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; /** @@ -30,6 +31,14 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; public class StreamParserError { private final String message; + public static Either left(Throwable ex) { + return Either.left(fromThrowable(ex)); + } + + public static StreamParserError fromThrowable(Throwable ex) { + return new StreamParserError(ex.getMessage()); + } + public StreamParserError(String message) { this.message = message; } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java new file mode 100644 index 00000000..f18f2175 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; + +import com.google.gson.JsonObject; +import io.vavr.control.Either; +import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; + +/** + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +@ExperimentalApi +public interface StreamFromGsonParser extends StreamParser { + +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java new file mode 100644 index 00000000..4b0223fd --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java @@ -0,0 +1,44 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; + +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.KafkaSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.KafkaSourceParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +public final class StreamFromGsonParsers { + + private StreamFromGsonParsers() { + } + + public static StreamFromGsonParser kafkaSinkParser() { + return KafkaSinkParser.create(); + } + + public static StreamFromGsonParser kafkaSourceParser() { + return KafkaSourceParser.create(); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java index 9ba7047a..3467c809 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java @@ -21,22 +21,44 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; import io.vavr.control.Either; +import io.vavr.control.Try; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.MerkleTree; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; /** + * A generic data stream parser which parses {@code T} to data stream {@code S}. + * * @author Piotr Jaszczyk - * @since 1.1.2 + * @param input data type, eg. Gson Object + * @param output data type + * @since 1.1.3 */ @ExperimentalApi -public interface StreamParser { +public interface StreamParser { - Either parse(MerkleTree subtree); + /** + * Parse the input data {@code T} producing the {@link DataStream}. + * + * @param input - the input data + * @return Right(parsing result) or Left(parsing error) + */ + default Either parse(T input) { + return Try.of(() -> unsafeParse(input)) + .toEither() + .mapLeft(StreamParserError::fromThrowable); + } - default S unsafeParse(MerkleTree subtree) { - return parse(subtree).getOrElseThrow(StreamParsingException::new); + /** + * Parse the input data {@code T} producing the {@link DataStream}. Will throw StreamParsingException when input + * was invalid. + * + * @param input - the input data + * @return parsing result + * @throws StreamParsingException when parsing was unsuccessful + */ + default S unsafeParse(T input) { + return parse(input).getOrElseThrow(StreamParsingException::new); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java new file mode 100644 index 00000000..ecafd30c --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java @@ -0,0 +1,92 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import java.util.Objects; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +abstract class GsonKafka implements Kafka { + + protected final KafkaInfo kafkaInfo; + private final AafCredentials aafCredentials; + + GsonKafka(@NotNull KafkaInfo kafkaInfo, + @Nullable AafCredentials aafCredentials) { + this.kafkaInfo = Objects.requireNonNull(kafkaInfo, "kafkaInfo"); + this.aafCredentials = aafCredentials; + } + + @Override + public String bootstrapServers() { + return kafkaInfo.bootstrapServers(); + } + + @Override + public String topicName() { + return kafkaInfo.topicName(); + } + + @Override + public @Nullable AafCredentials aafCredentials() { + return aafCredentials; + } + + @Override + public @Nullable String clientRole() { + return kafkaInfo.clientRole(); + } + + @Override + public @Nullable String clientId() { + return kafkaInfo.clientId(); + } + + @Override + public int maxPayloadSizeBytes() { + return kafkaInfo.maxPayloadSizeBytes(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GsonKafka gsonKafka = (GsonKafka) o; + return kafkaInfo.equals(gsonKafka.kafkaInfo) && + Objects.equals(aafCredentials, gsonKafka.aafCredentials); + } + + @Override + public int hashCode() { + return Objects.hash(kafkaInfo, aafCredentials); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java new file mode 100644 index 00000000..c45f8470 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +class GsonKafkaSink extends GsonKafka implements KafkaSink { + + GsonKafkaSink( + @NotNull KafkaInfo kafkaInfo, + @Nullable AafCredentials aafCredentials) { + super(kafkaInfo, aafCredentials); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java new file mode 100644 index 00000000..1509d9d7 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java @@ -0,0 +1,45 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +class GsonKafkaSource extends GsonKafka implements KafkaSource { + + GsonKafkaSource( + @NotNull KafkaInfo kafkaInfo, + @Nullable AafCredentials aafCredentials) { + super(kafkaInfo, aafCredentials); + } + + @Override + public @Nullable String consumerGroupId() { + return kafkaInfo.consumerGroupId(); + } + +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java new file mode 100644 index 00000000..a813607e --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java @@ -0,0 +1,89 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.vavr.Lazy; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +final class GsonUtils { + private static final Lazy GSON = Lazy.of(() -> { + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersKafkaInfo()); + gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersAafCredentials()); + return gsonBuilder.create(); + }); + + private GsonUtils() { + } + + static Gson gsonInstance() { + return GSON.get(); + } + + static void assertStreamType(JsonObject json, String expectedType) { + final String actualType = requiredString(json, "type"); + if (!actualType.equals(expectedType)) { + throw new IllegalArgumentException("Invalid stream type. Expected '" + expectedType + "', but was '" + actualType + "'"); + } + } + + static String requiredString(JsonObject parent, String childName) { + return requiredChild(parent, childName).getAsString(); + } + + static JsonElement requiredChild(JsonObject parent, String childName) { + if (parent.has(childName)) { + return parent.get(childName); + } else { + throw new IllegalArgumentException( + "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + stringifyChildrenNames(parent)); + } + } + + static JsonObject readObjectFromResource(String resource) throws IOException { + return readFromResource(resource).getAsJsonObject(); + } + + static JsonElement readFromResource(String resource) throws IOException { + try (Reader reader = new InputStreamReader(GsonUtils.class.getResourceAsStream(resource))) { + return new JsonParser().parse(reader); + } + } + + private static String stringifyChildrenNames(JsonObject parent) { + return parent.entrySet().stream().map(Entry::getKey).collect(Collectors.joining(", ")); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java new file mode 100644 index 00000000..8b17a8d4 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import com.google.gson.annotations.SerializedName; +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +@Value.Immutable +@Gson.TypeAdapters +public interface KafkaInfo { + + @SerializedName("bootstrap_servers") + String bootstrapServers(); + + @SerializedName("topic_name") + String topicName(); + + @SerializedName("consumer_group_id") + @Nullable String consumerGroupId(); + + @SerializedName("client_role") + @Nullable String clientRole(); + + @SerializedName("client_id") + @Nullable String clientId(); + + @Value.Default + @SerializedName("max_payload_size_bytes") + default int maxPayloadSizeBytes() { + return 1024 * 1024; + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java new file mode 100644 index 00000000..393fe40f --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java @@ -0,0 +1,58 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; + +/** + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +public class KafkaSinkParser implements StreamFromGsonParser { + + private final Gson gson; + + public static KafkaSinkParser create() { + return new KafkaSinkParser(gsonInstance()); + } + + KafkaSinkParser(Gson gson) { + this.gson = gson; + } + + @Override + public KafkaSink unsafeParse(JsonObject input) { + assertStreamType(input, "kafka"); + + final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info"); + final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class); + + return new GsonKafkaSink(kafkaInfo, null); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java new file mode 100644 index 00000000..8b48a2a4 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java @@ -0,0 +1,57 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; + +/** + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +public class KafkaSourceParser implements StreamFromGsonParser { + private final Gson gson; + + public static KafkaSourceParser create() { + return new KafkaSourceParser(gsonInstance()); + } + + KafkaSourceParser(Gson gson) { + this.gson = gson; + } + + @Override + public KafkaSource unsafeParse(JsonObject input) { + assertStreamType(input, "kafka"); + + final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info"); + final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class); + + return new GsonKafkaSource(kafkaInfo, null); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java index 11481561..ecb0b553 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java @@ -21,6 +21,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; +import org.immutables.gson.Gson; +import org.immutables.value.Value; import org.jetbrains.annotations.Nullable; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; @@ -29,6 +31,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; * @version 1.2.1 */ @ExperimentalApi +@Value.Immutable +@Gson.TypeAdapters public interface AafCredentials { @Nullable String username(); diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java index 3ccce215..e3389207 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java @@ -29,6 +29,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; * @version 1.2.1 */ @ExperimentalApi -public interface SinkStream { +public interface SinkStream extends DataStream { } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java index 78bb5b51..2bea143b 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java @@ -29,6 +29,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; * @version 1.2.1 */ @ExperimentalApi -public interface SourceStream { +public interface SourceStream extends DataStream { } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java new file mode 100644 index 00000000..97f07a29 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java @@ -0,0 +1,49 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; + +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; + +/** + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +@ExperimentalApi +public interface Kafka { + + String bootstrapServers(); + + String topicName(); + + @Nullable AafCredentials aafCredentials(); + + @Nullable String clientRole(); + + @Nullable String clientId(); + + @Value.Default + default int maxPayloadSizeBytes() { + return 1024 * 1024; + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java new file mode 100644 index 00000000..514881fe --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; + +import org.immutables.value.Value; +import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream; + +/** + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +@ExperimentalApi +@Value.Immutable +public abstract interface KafkaSink extends Kafka, SinkStream { + +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java new file mode 100644 index 00000000..65280a98 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; + +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream; + +/** + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +@ExperimentalApi +@Value.Immutable +public interface KafkaSource extends Kafka, SourceStream { + + @Nullable String consumerGroupId(); +} -- cgit 1.2.3-korg