From 52b5f5276add6116a14ba671c35ff9d87c140421 Mon Sep 17 00:00:00 2001 From: Tomasz Wrobel Date: Fri, 15 Jan 2021 14:34:18 +0100 Subject: Add Native Kafka streams support in bp-generator Issue-ID: DCAEGEN2-1179 Signed-off-by: Tomasz Wrobel Change-Id: I541dca959707a41c56205e20c9f5a56ccec5ca41 --- .../service/common/AppConfigService.java | 10 +- .../service/common/StreamService.java | 42 ++++--- .../service/common/kafka/AafCredential.java | 52 ++++++++ .../service/common/kafka/KafkaCommonConstants.java | 39 ++++++ .../service/common/kafka/KafkaInfo.java | 56 +++++++++ .../service/common/kafka/KafkaStream.java | 61 +++++++++ .../service/common/kafka/KafkaStreamService.java | 137 +++++++++++++++++++++ 7 files changed, 378 insertions(+), 19 deletions(-) create mode 100644 mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/AafCredential.java create mode 100644 mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaCommonConstants.java create mode 100644 mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaInfo.java create mode 100644 mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStream.java create mode 100644 mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamService.java (limited to 'mod/bpgenerator/onap/src/main/java') diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/AppConfigService.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/AppConfigService.java index f0d0e50..3db5ea0 100644 --- a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/AppConfigService.java +++ b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/AppConfigService.java @@ -27,13 +27,11 @@ package org.onap.blueprintgenerator.service.common; import org.onap.blueprintgenerator.constants.Constants; import org.onap.blueprintgenerator.model.common.Appconfig; -import org.onap.blueprintgenerator.model.common.Dmaap; +import org.onap.blueprintgenerator.model.common.BaseStream; import org.onap.blueprintgenerator.model.common.GetInput; import org.onap.blueprintgenerator.model.componentspec.OnapComponentSpec; import org.onap.blueprintgenerator.model.componentspec.common.Calls; import org.onap.blueprintgenerator.model.componentspec.common.Parameters; -import org.onap.blueprintgenerator.model.componentspec.common.Publishes; -import org.onap.blueprintgenerator.model.componentspec.common.Subscribes; import org.onap.blueprintgenerator.service.base.BlueprintHelperService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -60,6 +58,7 @@ public class AppConfigService { @Autowired private StreamService streamService; + /** * Creates Inputs section under App Config with Publishes, Subscribes, Parameters sections by * checking Datarouter/MessageRouter/override/Dmaap values @@ -82,9 +81,9 @@ public class AppConfigService { Calls[] call = new Calls[0]; appconfig.setService_calls(call); - Map streamPublishes = streamService.createStreamPublishes( + Map streamPublishes = streamService.createStreamPublishes( onapComponentSpec, blueprintHelperService, dmaapService, inputs, isDmaap); - Map streamSubscribes = streamService.createStreamSubscribes( + Map streamSubscribes = streamService.createStreamSubscribes( onapComponentSpec, blueprintHelperService, dmaapService, inputs, isDmaap); appconfig.setStreams_publishes(streamPublishes); @@ -132,4 +131,5 @@ public class AppConfigService { response.put("inputs", inputs); return response; } + } diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java index bd4cf87..f27ea48 100644 --- a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java +++ b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java @@ -29,39 +29,45 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.TreeMap; import org.onap.blueprintgenerator.constants.Constants; +import org.onap.blueprintgenerator.model.common.BaseStream; import org.onap.blueprintgenerator.model.common.Dmaap; import org.onap.blueprintgenerator.model.componentspec.OnapComponentSpec; import org.onap.blueprintgenerator.model.componentspec.common.Publishes; import org.onap.blueprintgenerator.model.componentspec.common.Subscribes; import org.onap.blueprintgenerator.service.base.BlueprintHelperService; +import org.onap.blueprintgenerator.service.common.kafka.KafkaStreamService; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @author : Joanna Jeremicz - * @date 01/15/2021 Application: ONAP - Blueprint Generator Common ONAP Service - * to create publishes and subscribes streams + * @date 01/15/2021 Application: ONAP - Blueprint Generator Common ONAP Service to create publishes and subscribes + * streams */ @Service("streamService") public class StreamService { + @Autowired + private KafkaStreamService kafkaStreamsService; + /** * Creates publishes stream for given Inputs and ComponentSpec * - * @param onapComponentSpec Onap Component Specification + * @param onapComponentSpec Onap Component Specification * @param blueprintHelperService Blueprint Helper Service - * @param dmaapService Dmaap Service - * @param inputs Inputs - * @param isDmaap Dmaap Argument + * @param dmaapService Dmaap Service + * @param inputs Inputs + * @param isDmaap Dmaap Argument * @return */ - public Map createStreamPublishes( + public Map createStreamPublishes( OnapComponentSpec onapComponentSpec, BlueprintHelperService blueprintHelperService, DmaapService dmaapService, Map> inputs, boolean isDmaap) { - Map streamPublishes = new TreeMap<>(); + Map streamPublishes = new TreeMap<>(); if (onapComponentSpec.getStreams() == null || onapComponentSpec.getStreams().getPublishes() == null) { return streamPublishes; } @@ -90,6 +96,9 @@ public class StreamService { Dmaap dmaap = (Dmaap) dmaapDataRouterResponse.get("dmaap"); dmaap.setType(publishes.getType()); streamPublishes.put(config, dmaap); + } else if (blueprintHelperService.isKafkaStreamType(publishes.getType())) { + inputs.putAll(kafkaStreamsService.createStreamPublishInputs(publishes.getConfig_key())); + streamPublishes.putAll(kafkaStreamsService.createAppPropertiesPublish(publishes.getConfig_key())); } } return streamPublishes; @@ -98,21 +107,21 @@ public class StreamService { /** * Creates subscribes stream for given Inputs and ComponentSpec * - * @param onapComponentSpec Onap Component Specification + * @param onapComponentSpec Onap Component Specification * @param blueprintHelperService Blueprint Helper Service - * @param dmaapService Dmaap Service - * @param inputs Inputs - * @param isDmaap Dmaap Argument + * @param dmaapService Dmaap Service + * @param inputs Inputs + * @param isDmaap Dmaap Argument * @return */ - public Map createStreamSubscribes( + public Map createStreamSubscribes( OnapComponentSpec onapComponentSpec, BlueprintHelperService blueprintHelperService, DmaapService dmaapService, Map> inputs, boolean isDmaap) { - Map streamSubscribes = new TreeMap<>(); + Map streamSubscribes = new TreeMap<>(); if (onapComponentSpec.getStreams() == null || onapComponentSpec.getStreams().getSubscribes() == null) { return streamSubscribes; } @@ -141,7 +150,12 @@ public class StreamService { Dmaap dmaap = (Dmaap) dmaapDataRouterResponse.get("dmaap"); dmaap.setType(subscribes.getType()); streamSubscribes.put(config, dmaap); + } else if (blueprintHelperService.isKafkaStreamType(subscribes.getType())) { + inputs.putAll(kafkaStreamsService.createStreamSubscribeInputs(subscribes.getConfig_key())); + streamSubscribes.putAll(kafkaStreamsService.createAppPropertiesSubscribe(subscribes.getConfig_key())); } + + } return streamSubscribes; } diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/AafCredential.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/AafCredential.java new file mode 100644 index 0000000..072a54c --- /dev/null +++ b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/AafCredential.java @@ -0,0 +1,52 @@ +/* + * + * * ============LICENSE_START======================================================= + * * org.onap.dcae + * * ================================================================================ + * * Copyright (c) 2021 Nokia Intellectual Property. All rights reserved. + * * ================================================================================ + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END========================================================= + * + * + */ + +package org.onap.blueprintgenerator.service.common.kafka; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import lombok.Data; +import org.onap.blueprintgenerator.model.common.GetInput; + +/** + * @author : Tomasz Wrobel + * @date 01/18/2021 Application: DCAE/ONAP - Blueprint Generator + * Applications Common Model: A model class which represents AafCredential + */ + +@Data +@JsonInclude(value = Include.NON_NULL) +public class AafCredential { + + private GetInput username; + + private GetInput password; + + public AafCredential(String usernameInput, String passwordInput) { + + this.username = new GetInput(usernameInput); + + this.password = new GetInput(passwordInput); + + } +} diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaCommonConstants.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaCommonConstants.java new file mode 100644 index 0000000..0a040be --- /dev/null +++ b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaCommonConstants.java @@ -0,0 +1,39 @@ +/* + * + * * ============LICENSE_START======================================================= + * * org.onap.dcae + * * ================================================================================ + * * Copyright (c) 2021 Nokia Intellectual Property. All rights reserved. + * * ================================================================================ + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END========================================================= + * + * + */ + +package org.onap.blueprintgenerator.service.common.kafka; + +/** + * @author : Tomasz Wrobel + * @date 01/18/2021 Application: DCAE/ONAP - Blueprint Generator + * Class which contains Kafka Constants + */ + +public class KafkaCommonConstants { + + public static final String KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME = "kafka_bootstrap_servers"; + + public static final String AFF_KAFKA_USER_INPUT_NAME = "kafka_username"; + public static final String AAF_KAFKA_PASSWORD_INPUT_NAME = "kafka_password"; + +} diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaInfo.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaInfo.java new file mode 100644 index 0000000..c03eaf0 --- /dev/null +++ b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaInfo.java @@ -0,0 +1,56 @@ +/* + * + * * ============LICENSE_START======================================================= + * * org.onap.dcae + * * ================================================================================ + * * Copyright (c) 2021 Nokia Intellectual Property. All rights reserved. + * * ================================================================================ + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END========================================================= + * + * + */ + +package org.onap.blueprintgenerator.service.common.kafka; + +import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import lombok.Data; +import org.onap.blueprintgenerator.model.common.GetInput; + + +/** + * @author : Tomasz Wrobel + * @date 01/18/2021 Application: DCAE/ONAP - Blueprint Generator + * Applications Common Model: A model class which represents Kafka Info + */ + +@Data +@JsonInclude(value = Include.NON_NULL) +public class KafkaInfo { + + private GetInput bootstrap_servers; + + private GetInput topic_name; + + public KafkaInfo(String topicName) { + + this.bootstrap_servers = new GetInput(KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME); + + this.topic_name = new GetInput(topicName); + + } + +} diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStream.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStream.java new file mode 100644 index 0000000..ca92f69 --- /dev/null +++ b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStream.java @@ -0,0 +1,61 @@ +/* + * + * * ============LICENSE_START======================================================= + * * org.onap.dcae + * * ================================================================================ + * * Copyright (c) 2021 Nokia Intellectual Property. All rights reserved. + * * ================================================================================ + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END========================================================= + * + * + */ + + +package org.onap.blueprintgenerator.service.common.kafka; + +import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.AAF_KAFKA_PASSWORD_INPUT_NAME; +import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.AFF_KAFKA_USER_INPUT_NAME; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.Data; +import org.onap.blueprintgenerator.model.common.BaseStream; + + +/** + * @author : Tomasz Wrobel + * @date 01/18/2021 Application: DCAE/ONAP - Blueprint Generator + * Applications Common Model: A model class which represents Kafka Stream + */ + +@Data +@JsonInclude(value = JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class KafkaStream implements BaseStream { + + private final String type = "kafka"; + + @JsonProperty("aaf_credentials") + private AafCredential aafCredential; + + @JsonProperty("kafka_info") + private KafkaInfo kafkaInfo; + + public KafkaStream(String topicName) { + this.aafCredential = new AafCredential(AFF_KAFKA_USER_INPUT_NAME, AAF_KAFKA_PASSWORD_INPUT_NAME); + this.kafkaInfo = new KafkaInfo(topicName); + } +} diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamService.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamService.java new file mode 100644 index 0000000..2090ef0 --- /dev/null +++ b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamService.java @@ -0,0 +1,137 @@ +/* + * + * * ============LICENSE_START======================================================= + * * org.onap.dcae + * * ================================================================================ + * * Copyright (c) 2021 Nokia Intellectual Property. All rights reserved. + * * ================================================================================ + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END========================================================= + * + * + */ + +package org.onap.blueprintgenerator.service.common.kafka; + + +import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.AAF_KAFKA_PASSWORD_INPUT_NAME; +import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.AFF_KAFKA_USER_INPUT_NAME; +import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME; + +import java.util.LinkedHashMap; +import java.util.Map; +import org.onap.blueprintgenerator.service.base.BlueprintHelperService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * @author : Tomasz Wrobel + * @date 18/01/2021 Application: ONAP - Blueprint Generator Common ONAP Service used to create Kafka Stream application + * config object and Kafka Stream inputs + */ +@Service +public class KafkaStreamService { + + private static final String PUBLISH_URL_SUFFIX = "_publish_url"; + private static final String SUBSCRIBE_URL_SUFFIX = "_subscribe_url"; + private static final String DEFAULT_STREAM_URL = "sample_stream_url"; + private static final String DEFAULT_BOOTSTRAP_SERVER = "message-router-kafka:9092"; + private static final String DEFAULT_AAF_USER = "admin"; + private static final String DEFAULT_AAF_PASSWORD = "admin_secret"; + + @Autowired + private BlueprintHelperService blueprintHelperService; + + + /** + * Creates publish stream inputs for given streamName + * + * @param streamName Stream name + * @return + */ + public Map> createStreamPublishInputs(String streamName) { + return createStreamInputs(streamName + PUBLISH_URL_SUFFIX); + } + + /** + * Creates subscribe stream inputs for given streamName + * + * @param streamName Stream name + * @return + */ + public Map> createStreamSubscribeInputs(String streamName) { + return createStreamInputs(streamName + SUBSCRIBE_URL_SUFFIX); + } + + /** + * Creates Application properties publish stream object for given streamName + * + * @param streamName Stream name + * @return + */ + public Map createAppPropertiesPublish(String streamName) { + + LinkedHashMap kafkaStreamMap = new LinkedHashMap<>(); + KafkaStream kafkaStream = createAppProperties(streamName, PUBLISH_URL_SUFFIX); + + kafkaStreamMap.put(streamName, kafkaStream); + + return kafkaStreamMap; + } + + /** + * Creates Application properties subscribe stream object for given streamName + * + * @param streamName Stream name + * @return + */ + public Map createAppPropertiesSubscribe(String streamName) { + + LinkedHashMap kafkaStreamMap = new LinkedHashMap<>(); + KafkaStream kafkaStream = createAppProperties(streamName, SUBSCRIBE_URL_SUFFIX); + + kafkaStreamMap.put(streamName, kafkaStream); + + return kafkaStreamMap; + } + + private KafkaStream createAppProperties(String streamName, String urlSuffix) { + String topicName = streamName + urlSuffix; + + return new KafkaStream(topicName); + } + + private Map> createStreamInputs(String streamName) { + LinkedHashMap> streamInputs = createBaseInputs(); + LinkedHashMap stream = + blueprintHelperService.createStringInput(DEFAULT_STREAM_URL); + streamInputs.put(streamName, stream); + return streamInputs; + } + + private LinkedHashMap> createBaseInputs() { + LinkedHashMap> baseInputs = new LinkedHashMap<>(); + + LinkedHashMap kafka_message_router = blueprintHelperService + .createStringInput(DEFAULT_BOOTSTRAP_SERVER); + baseInputs.put(KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME, kafka_message_router); + + LinkedHashMap kafka_username = blueprintHelperService.createStringInput(DEFAULT_AAF_USER); + baseInputs.put(AFF_KAFKA_USER_INPUT_NAME, kafka_username); + + LinkedHashMap kafka_password = blueprintHelperService.createStringInput(DEFAULT_AAF_PASSWORD); + baseInputs.put(AAF_KAFKA_PASSWORD_INPUT_NAME, kafka_password); + + return baseInputs; + } +} -- cgit 1.2.3-korg