From 52b5f5276add6116a14ba671c35ff9d87c140421 Mon Sep 17 00:00:00 2001 From: Tomasz Wrobel Date: Fri, 15 Jan 2021 14:34:18 +0100 Subject: Add Native Kafka streams support in bp-generator Issue-ID: DCAEGEN2-1179 Signed-off-by: Tomasz Wrobel Change-Id: I541dca959707a41c56205e20c9f5a56ccec5ca41 --- .../blueprintgenerator/constants/Constants.java | 3 + .../blueprintgenerator/model/common/Appconfig.java | 6 +- .../model/common/BaseStream.java | 37 ++++++ .../blueprintgenerator/model/common/Dmaap.java | 10 +- .../service/base/BlueprintHelperService.java | 12 ++ .../service/common/AppConfigService.java | 10 +- .../service/common/StreamService.java | 42 ++++--- .../service/common/kafka/AafCredential.java | 52 ++++++++ .../service/common/kafka/KafkaCommonConstants.java | 39 ++++++ .../service/common/kafka/KafkaInfo.java | 56 +++++++++ .../service/common/kafka/KafkaStream.java | 61 +++++++++ .../service/common/kafka/KafkaStreamService.java | 137 +++++++++++++++++++++ .../service/common/StreamServiceTest.java | 63 +++++----- .../common/kafka/KafkaStreamServiceTest.java | 125 +++++++++++++++++++ .../service/common/kafka/KafkaStreamTest.java | 98 +++++++++++++++ 15 files changed, 698 insertions(+), 53 deletions(-) create mode 100644 mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/BaseStream.java create mode 100644 mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/AafCredential.java create mode 100644 mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaCommonConstants.java create mode 100644 mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaInfo.java create mode 100644 mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStream.java create mode 100644 mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamService.java create mode 100644 mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamServiceTest.java create mode 100644 mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamTest.java diff --git a/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/constants/Constants.java b/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/constants/Constants.java index ac654a2..fbd0527 100644 --- a/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/constants/Constants.java +++ b/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/constants/Constants.java @@ -5,6 +5,8 @@ * * ================================================================================ * * Copyright (c) 2020 AT&T Intellectual Property. All rights reserved. * * ================================================================================ + * * Modifications Copyright (c) 2021 Nokia + * * ================================================================================ * * Licensed under the Apache License, Version 2.0 (the "License"); * * you may not use this file except in compliance with the License. * * You may obtain a copy of the License at @@ -39,6 +41,7 @@ public class Constants { public static final String DATA_ROUTER = "data_router"; public static final String MESSAGEROUTER_VALUE = "message router"; public static final String MESSAGE_ROUTER = "message_router"; + public static final String KAFKA_TYPE = "kafka"; public static final String TOSCA_DEF_VERSION = "cloudify_dsl_1_3"; public static final String SERVICE_COMPONENT_NAME_OVERRIDE = "service_component_name_override"; public static final String EMPTY = "''"; diff --git a/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/Appconfig.java b/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/Appconfig.java index 5b3cdc6..d0f5784 100644 --- a/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/Appconfig.java +++ b/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/Appconfig.java @@ -5,6 +5,8 @@ * * ================================================================================ * * Copyright (c) 2020 AT&T Intellectual Property. All rights reserved. * * ================================================================================ + * * Modifications Copyright (c) 2021 Nokia + * * ================================================================================ * * Licensed under the Apache License, Version 2.0 (the "License"); * * you may not use this file except in compliance with the License. * * You may obtain a copy of the License at @@ -43,9 +45,9 @@ public class Appconfig { private Calls[] services_calls; - private Map streams_publishes; + private Map streams_publishes; - private Map streams_subscribes; + private Map streams_subscribes; private Map params; diff --git a/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/BaseStream.java b/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/BaseStream.java new file mode 100644 index 0000000..310f9a0 --- /dev/null +++ b/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/BaseStream.java @@ -0,0 +1,37 @@ +/* + * + * * ============LICENSE_START======================================================= + * * org.onap.dcae + * * ================================================================================ + * * Copyright (c) 2021 Nokia Intellectual Property. All rights reserved. + * * ================================================================================ + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END========================================================= + * + * + */ + + +package org.onap.blueprintgenerator.model.common; + +/** + * @author : Tomasz Wrobel + * @date 01/18/2021 Application: DCAE/ONAP - Blueprint Generator + * Applications Common Model: An interface with common stream field + */ + +public interface BaseStream { + + String getType(); + +} diff --git a/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/Dmaap.java b/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/Dmaap.java index 7ed79b3..9c37773 100644 --- a/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/Dmaap.java +++ b/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/Dmaap.java @@ -5,6 +5,8 @@ * * ================================================================================ * * Copyright (c) 2020 AT&T Intellectual Property. All rights reserved. * * ================================================================================ + * * Modifications Copyright (c) 2021 Nokia + * * ================================================================================ * * Licensed under the Apache License, Version 2.0 (the "License"); * * you may not use this file except in compliance with the License. * * You may obtain a copy of the License at @@ -29,13 +31,14 @@ import lombok.Data; /** * @author : Ravi Mantena - * @date 10/16/2020 Application: DCAE/ONAP - Blueprint Generator Common Module: Used by both ONAP - * and DCAE Blueprint Applications Common Model: A model class which represents Dmaap + * @date 10/16/2020 Application: DCAE/ONAP - Blueprint Generator Common Module: Used by both ONAP and DCAE Blueprint + * Applications Common Model: A model class which represents Dmaap */ + @Data @JsonInclude(value = JsonInclude.Include.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) -public class Dmaap { +public class Dmaap implements BaseStream { private Object dmaap_info; @@ -45,4 +48,5 @@ public class Dmaap { private GetInput pass; private GetInput user; + } diff --git a/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/service/base/BlueprintHelperService.java b/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/service/base/BlueprintHelperService.java index db15360..c9759d0 100644 --- a/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/service/base/BlueprintHelperService.java +++ b/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/service/base/BlueprintHelperService.java @@ -5,6 +5,8 @@ * * ================================================================================ * * Copyright (c) 2020 AT&T Intellectual Property. All rights reserved. * * ================================================================================ + * * Modifications Copyright (c) 2021 Nokia + * * ================================================================================ * * Licensed under the Apache License, Version 2.0 (the "License"); * * you may not use this file except in compliance with the License. * * You may obtain a copy of the License at @@ -201,6 +203,16 @@ public class BlueprintHelperService { return type.equals(Constants.MESSAGE_ROUTER) || type.equals(Constants.MESSAGEROUTER_VALUE); } + /** + * Returns if the type is Kafka or not + * + * @param type Input Type + * @return + */ + public boolean isKafkaStreamType(String type) { + return type.equals(Constants.KAFKA_TYPE); + } + /** * Returns name with underscore for empty input * diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/AppConfigService.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/AppConfigService.java index f0d0e50..3db5ea0 100644 --- a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/AppConfigService.java +++ b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/AppConfigService.java @@ -27,13 +27,11 @@ package org.onap.blueprintgenerator.service.common; import org.onap.blueprintgenerator.constants.Constants; import org.onap.blueprintgenerator.model.common.Appconfig; -import org.onap.blueprintgenerator.model.common.Dmaap; +import org.onap.blueprintgenerator.model.common.BaseStream; import org.onap.blueprintgenerator.model.common.GetInput; import org.onap.blueprintgenerator.model.componentspec.OnapComponentSpec; import org.onap.blueprintgenerator.model.componentspec.common.Calls; import org.onap.blueprintgenerator.model.componentspec.common.Parameters; -import org.onap.blueprintgenerator.model.componentspec.common.Publishes; -import org.onap.blueprintgenerator.model.componentspec.common.Subscribes; import org.onap.blueprintgenerator.service.base.BlueprintHelperService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -60,6 +58,7 @@ public class AppConfigService { @Autowired private StreamService streamService; + /** * Creates Inputs section under App Config with Publishes, Subscribes, Parameters sections by * checking Datarouter/MessageRouter/override/Dmaap values @@ -82,9 +81,9 @@ public class AppConfigService { Calls[] call = new Calls[0]; appconfig.setService_calls(call); - Map streamPublishes = streamService.createStreamPublishes( + Map streamPublishes = streamService.createStreamPublishes( onapComponentSpec, blueprintHelperService, dmaapService, inputs, isDmaap); - Map streamSubscribes = streamService.createStreamSubscribes( + Map streamSubscribes = streamService.createStreamSubscribes( onapComponentSpec, blueprintHelperService, dmaapService, inputs, isDmaap); appconfig.setStreams_publishes(streamPublishes); @@ -132,4 +131,5 @@ public class AppConfigService { response.put("inputs", inputs); return response; } + } diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java index bd4cf87..f27ea48 100644 --- a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java +++ b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java @@ -29,39 +29,45 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.TreeMap; import org.onap.blueprintgenerator.constants.Constants; +import org.onap.blueprintgenerator.model.common.BaseStream; import org.onap.blueprintgenerator.model.common.Dmaap; import org.onap.blueprintgenerator.model.componentspec.OnapComponentSpec; import org.onap.blueprintgenerator.model.componentspec.common.Publishes; import org.onap.blueprintgenerator.model.componentspec.common.Subscribes; import org.onap.blueprintgenerator.service.base.BlueprintHelperService; +import org.onap.blueprintgenerator.service.common.kafka.KafkaStreamService; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @author : Joanna Jeremicz - * @date 01/15/2021 Application: ONAP - Blueprint Generator Common ONAP Service - * to create publishes and subscribes streams + * @date 01/15/2021 Application: ONAP - Blueprint Generator Common ONAP Service to create publishes and subscribes + * streams */ @Service("streamService") public class StreamService { + @Autowired + private KafkaStreamService kafkaStreamsService; + /** * Creates publishes stream for given Inputs and ComponentSpec * - * @param onapComponentSpec Onap Component Specification + * @param onapComponentSpec Onap Component Specification * @param blueprintHelperService Blueprint Helper Service - * @param dmaapService Dmaap Service - * @param inputs Inputs - * @param isDmaap Dmaap Argument + * @param dmaapService Dmaap Service + * @param inputs Inputs + * @param isDmaap Dmaap Argument * @return */ - public Map createStreamPublishes( + public Map createStreamPublishes( OnapComponentSpec onapComponentSpec, BlueprintHelperService blueprintHelperService, DmaapService dmaapService, Map> inputs, boolean isDmaap) { - Map streamPublishes = new TreeMap<>(); + Map streamPublishes = new TreeMap<>(); if (onapComponentSpec.getStreams() == null || onapComponentSpec.getStreams().getPublishes() == null) { return streamPublishes; } @@ -90,6 +96,9 @@ public class StreamService { Dmaap dmaap = (Dmaap) dmaapDataRouterResponse.get("dmaap"); dmaap.setType(publishes.getType()); streamPublishes.put(config, dmaap); + } else if (blueprintHelperService.isKafkaStreamType(publishes.getType())) { + inputs.putAll(kafkaStreamsService.createStreamPublishInputs(publishes.getConfig_key())); + streamPublishes.putAll(kafkaStreamsService.createAppPropertiesPublish(publishes.getConfig_key())); } } return streamPublishes; @@ -98,21 +107,21 @@ public class StreamService { /** * Creates subscribes stream for given Inputs and ComponentSpec * - * @param onapComponentSpec Onap Component Specification + * @param onapComponentSpec Onap Component Specification * @param blueprintHelperService Blueprint Helper Service - * @param dmaapService Dmaap Service - * @param inputs Inputs - * @param isDmaap Dmaap Argument + * @param dmaapService Dmaap Service + * @param inputs Inputs + * @param isDmaap Dmaap Argument * @return */ - public Map createStreamSubscribes( + public Map createStreamSubscribes( OnapComponentSpec onapComponentSpec, BlueprintHelperService blueprintHelperService, DmaapService dmaapService, Map> inputs, boolean isDmaap) { - Map streamSubscribes = new TreeMap<>(); + Map streamSubscribes = new TreeMap<>(); if (onapComponentSpec.getStreams() == null || onapComponentSpec.getStreams().getSubscribes() == null) { return streamSubscribes; } @@ -141,7 +150,12 @@ public class StreamService { Dmaap dmaap = (Dmaap) dmaapDataRouterResponse.get("dmaap"); dmaap.setType(subscribes.getType()); streamSubscribes.put(config, dmaap); + } else if (blueprintHelperService.isKafkaStreamType(subscribes.getType())) { + inputs.putAll(kafkaStreamsService.createStreamSubscribeInputs(subscribes.getConfig_key())); + streamSubscribes.putAll(kafkaStreamsService.createAppPropertiesSubscribe(subscribes.getConfig_key())); } + + } return streamSubscribes; } diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/AafCredential.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/AafCredential.java new file mode 100644 index 0000000..072a54c --- /dev/null +++ b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/AafCredential.java @@ -0,0 +1,52 @@ +/* + * + * * ============LICENSE_START======================================================= + * * org.onap.dcae + * * ================================================================================ + * * Copyright (c) 2021 Nokia Intellectual Property. All rights reserved. + * * ================================================================================ + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END========================================================= + * + * + */ + +package org.onap.blueprintgenerator.service.common.kafka; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import lombok.Data; +import org.onap.blueprintgenerator.model.common.GetInput; + +/** + * @author : Tomasz Wrobel + * @date 01/18/2021 Application: DCAE/ONAP - Blueprint Generator + * Applications Common Model: A model class which represents AafCredential + */ + +@Data +@JsonInclude(value = Include.NON_NULL) +public class AafCredential { + + private GetInput username; + + private GetInput password; + + public AafCredential(String usernameInput, String passwordInput) { + + this.username = new GetInput(usernameInput); + + this.password = new GetInput(passwordInput); + + } +} diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaCommonConstants.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaCommonConstants.java new file mode 100644 index 0000000..0a040be --- /dev/null +++ b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaCommonConstants.java @@ -0,0 +1,39 @@ +/* + * + * * ============LICENSE_START======================================================= + * * org.onap.dcae + * * ================================================================================ + * * Copyright (c) 2021 Nokia Intellectual Property. All rights reserved. + * * ================================================================================ + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END========================================================= + * + * + */ + +package org.onap.blueprintgenerator.service.common.kafka; + +/** + * @author : Tomasz Wrobel + * @date 01/18/2021 Application: DCAE/ONAP - Blueprint Generator + * Class which contains Kafka Constants + */ + +public class KafkaCommonConstants { + + public static final String KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME = "kafka_bootstrap_servers"; + + public static final String AFF_KAFKA_USER_INPUT_NAME = "kafka_username"; + public static final String AAF_KAFKA_PASSWORD_INPUT_NAME = "kafka_password"; + +} diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaInfo.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaInfo.java new file mode 100644 index 0000000..c03eaf0 --- /dev/null +++ b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaInfo.java @@ -0,0 +1,56 @@ +/* + * + * * ============LICENSE_START======================================================= + * * org.onap.dcae + * * ================================================================================ + * * Copyright (c) 2021 Nokia Intellectual Property. All rights reserved. + * * ================================================================================ + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END========================================================= + * + * + */ + +package org.onap.blueprintgenerator.service.common.kafka; + +import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import lombok.Data; +import org.onap.blueprintgenerator.model.common.GetInput; + + +/** + * @author : Tomasz Wrobel + * @date 01/18/2021 Application: DCAE/ONAP - Blueprint Generator + * Applications Common Model: A model class which represents Kafka Info + */ + +@Data +@JsonInclude(value = Include.NON_NULL) +public class KafkaInfo { + + private GetInput bootstrap_servers; + + private GetInput topic_name; + + public KafkaInfo(String topicName) { + + this.bootstrap_servers = new GetInput(KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME); + + this.topic_name = new GetInput(topicName); + + } + +} diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStream.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStream.java new file mode 100644 index 0000000..ca92f69 --- /dev/null +++ b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStream.java @@ -0,0 +1,61 @@ +/* + * + * * ============LICENSE_START======================================================= + * * org.onap.dcae + * * ================================================================================ + * * Copyright (c) 2021 Nokia Intellectual Property. All rights reserved. + * * ================================================================================ + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END========================================================= + * + * + */ + + +package org.onap.blueprintgenerator.service.common.kafka; + +import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.AAF_KAFKA_PASSWORD_INPUT_NAME; +import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.AFF_KAFKA_USER_INPUT_NAME; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.Data; +import org.onap.blueprintgenerator.model.common.BaseStream; + + +/** + * @author : Tomasz Wrobel + * @date 01/18/2021 Application: DCAE/ONAP - Blueprint Generator + * Applications Common Model: A model class which represents Kafka Stream + */ + +@Data +@JsonInclude(value = JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class KafkaStream implements BaseStream { + + private final String type = "kafka"; + + @JsonProperty("aaf_credentials") + private AafCredential aafCredential; + + @JsonProperty("kafka_info") + private KafkaInfo kafkaInfo; + + public KafkaStream(String topicName) { + this.aafCredential = new AafCredential(AFF_KAFKA_USER_INPUT_NAME, AAF_KAFKA_PASSWORD_INPUT_NAME); + this.kafkaInfo = new KafkaInfo(topicName); + } +} diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamService.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamService.java new file mode 100644 index 0000000..2090ef0 --- /dev/null +++ b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamService.java @@ -0,0 +1,137 @@ +/* + * + * * ============LICENSE_START======================================================= + * * org.onap.dcae + * * ================================================================================ + * * Copyright (c) 2021 Nokia Intellectual Property. All rights reserved. + * * ================================================================================ + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END========================================================= + * + * + */ + +package org.onap.blueprintgenerator.service.common.kafka; + + +import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.AAF_KAFKA_PASSWORD_INPUT_NAME; +import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.AFF_KAFKA_USER_INPUT_NAME; +import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME; + +import java.util.LinkedHashMap; +import java.util.Map; +import org.onap.blueprintgenerator.service.base.BlueprintHelperService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * @author : Tomasz Wrobel + * @date 18/01/2021 Application: ONAP - Blueprint Generator Common ONAP Service used to create Kafka Stream application + * config object and Kafka Stream inputs + */ +@Service +public class KafkaStreamService { + + private static final String PUBLISH_URL_SUFFIX = "_publish_url"; + private static final String SUBSCRIBE_URL_SUFFIX = "_subscribe_url"; + private static final String DEFAULT_STREAM_URL = "sample_stream_url"; + private static final String DEFAULT_BOOTSTRAP_SERVER = "message-router-kafka:9092"; + private static final String DEFAULT_AAF_USER = "admin"; + private static final String DEFAULT_AAF_PASSWORD = "admin_secret"; + + @Autowired + private BlueprintHelperService blueprintHelperService; + + + /** + * Creates publish stream inputs for given streamName + * + * @param streamName Stream name + * @return + */ + public Map> createStreamPublishInputs(String streamName) { + return createStreamInputs(streamName + PUBLISH_URL_SUFFIX); + } + + /** + * Creates subscribe stream inputs for given streamName + * + * @param streamName Stream name + * @return + */ + public Map> createStreamSubscribeInputs(String streamName) { + return createStreamInputs(streamName + SUBSCRIBE_URL_SUFFIX); + } + + /** + * Creates Application properties publish stream object for given streamName + * + * @param streamName Stream name + * @return + */ + public Map createAppPropertiesPublish(String streamName) { + + LinkedHashMap kafkaStreamMap = new LinkedHashMap<>(); + KafkaStream kafkaStream = createAppProperties(streamName, PUBLISH_URL_SUFFIX); + + kafkaStreamMap.put(streamName, kafkaStream); + + return kafkaStreamMap; + } + + /** + * Creates Application properties subscribe stream object for given streamName + * + * @param streamName Stream name + * @return + */ + public Map createAppPropertiesSubscribe(String streamName) { + + LinkedHashMap kafkaStreamMap = new LinkedHashMap<>(); + KafkaStream kafkaStream = createAppProperties(streamName, SUBSCRIBE_URL_SUFFIX); + + kafkaStreamMap.put(streamName, kafkaStream); + + return kafkaStreamMap; + } + + private KafkaStream createAppProperties(String streamName, String urlSuffix) { + String topicName = streamName + urlSuffix; + + return new KafkaStream(topicName); + } + + private Map> createStreamInputs(String streamName) { + LinkedHashMap> streamInputs = createBaseInputs(); + LinkedHashMap stream = + blueprintHelperService.createStringInput(DEFAULT_STREAM_URL); + streamInputs.put(streamName, stream); + return streamInputs; + } + + private LinkedHashMap> createBaseInputs() { + LinkedHashMap> baseInputs = new LinkedHashMap<>(); + + LinkedHashMap kafka_message_router = blueprintHelperService + .createStringInput(DEFAULT_BOOTSTRAP_SERVER); + baseInputs.put(KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME, kafka_message_router); + + LinkedHashMap kafka_username = blueprintHelperService.createStringInput(DEFAULT_AAF_USER); + baseInputs.put(AFF_KAFKA_USER_INPUT_NAME, kafka_username); + + LinkedHashMap kafka_password = blueprintHelperService.createStringInput(DEFAULT_AAF_PASSWORD); + baseInputs.put(AAF_KAFKA_PASSWORD_INPUT_NAME, kafka_password); + + return baseInputs; + } +} diff --git a/mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/StreamServiceTest.java b/mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/StreamServiceTest.java index 09bb176..fe93f5f 100644 --- a/mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/StreamServiceTest.java +++ b/mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/StreamServiceTest.java @@ -35,6 +35,7 @@ import java.util.LinkedHashMap; import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.onap.blueprintgenerator.model.common.BaseStream; import org.onap.blueprintgenerator.model.common.Dmaap; import org.onap.blueprintgenerator.model.componentspec.OnapComponentSpec; import org.onap.blueprintgenerator.model.componentspec.common.Publishes; @@ -66,7 +67,7 @@ class StreamServiceTest { void whenStreamsIsNullCreateStreamPublishesShouldReturnEmptyMap() { when(onapComponentSpecMock.getStreams()).thenReturn(null); - Map streamPublishes = streamService.createStreamPublishes( + Map streamPublishes = streamService.createStreamPublishes( onapComponentSpecMock, blueprintHelperServiceMock, dmaapServiceMock, @@ -81,7 +82,7 @@ class StreamServiceTest { when(streamsMock.getPublishes()).thenReturn(null); when(onapComponentSpecMock.getStreams()).thenReturn(streamsMock); - Map streamPublishes = streamService.createStreamPublishes( + Map streamPublishes = streamService.createStreamPublishes( onapComponentSpecMock, blueprintHelperServiceMock, dmaapServiceMock, @@ -99,21 +100,22 @@ class StreamServiceTest { DmaapService dmaapService = new DmaapService(); - Map streamPublishes = streamService.createStreamPublishes( + Map streamPublishes = streamService.createStreamPublishes( onapComponentSpecMock, blueprintHelperServiceMock, dmaapService, createInputs(), true); - Map expectedMap = createExpectedMap("_feed"); + Map expectedMap = createExpectedMap("_feed"); assertNotNull(streamPublishes); assertEquals(expectedMap.size(), streamPublishes.size()); - for(Map.Entry entry : expectedMap.entrySet()) { + for (Map.Entry entry : expectedMap.entrySet()) { assertTrue(streamPublishes.containsKey(entry.getKey())); - assertTrue(streamPublishes.get(entry.getKey()).getType().equals(entry.getValue().getType())); - assertTrue(streamPublishes.get(entry.getKey()).getDmaap_info().equals(entry.getValue().getDmaap_info())); + assertEquals(streamPublishes.get(entry.getKey()).getType(), entry.getValue().getType()); + assertEquals(((Dmaap) streamPublishes.get(entry.getKey())).getDmaap_info(), + ((Dmaap) entry.getValue()).getDmaap_info()); } } @@ -125,21 +127,22 @@ class StreamServiceTest { DmaapService dmaapService = new DmaapService(); - Map streamPublishes = streamService.createStreamPublishes( + Map streamPublishes = streamService.createStreamPublishes( onapComponentSpecMock, blueprintHelperServiceMock, dmaapService, createInputs(), true); - Map expectedMap = createExpectedMap("_topic"); + Map expectedMap = createExpectedMap("_topic"); assertNotNull(streamPublishes); assertEquals(expectedMap.size(), streamPublishes.size()); - for(Map.Entry entry : expectedMap.entrySet()) { + for (Map.Entry entry : expectedMap.entrySet()) { assertTrue(streamPublishes.containsKey(entry.getKey())); - assertTrue(streamPublishes.get(entry.getKey()).getType().equals(entry.getValue().getType())); - assertTrue(streamPublishes.get(entry.getKey()).getDmaap_info().equals(entry.getValue().getDmaap_info())); + assertEquals(streamPublishes.get(entry.getKey()).getType(), entry.getValue().getType()); + assertEquals(((Dmaap) streamPublishes.get(entry.getKey())).getDmaap_info(), + ((Dmaap) entry.getValue()).getDmaap_info()); } } @@ -147,7 +150,7 @@ class StreamServiceTest { void whenStreamsIsNullCreateStreamSubscribesShouldReturnEmptyMap() { when(onapComponentSpecMock.getStreams()).thenReturn(null); - Map streamSubscribes = streamService.createStreamSubscribes( + Map streamSubscribes = streamService.createStreamSubscribes( onapComponentSpecMock, blueprintHelperServiceMock, dmaapServiceMock, @@ -162,7 +165,7 @@ class StreamServiceTest { when(streamsMock.getPublishes()).thenReturn(null); when(onapComponentSpecMock.getStreams()).thenReturn(streamsMock); - Map streamSubscribes = streamService.createStreamSubscribes( + Map streamSubscribes = streamService.createStreamSubscribes( onapComponentSpecMock, blueprintHelperServiceMock, dmaapServiceMock, @@ -180,21 +183,22 @@ class StreamServiceTest { DmaapService dmaapService = new DmaapService(); - Map streamSubscribes = streamService.createStreamSubscribes( + Map streamSubscribes = streamService.createStreamSubscribes( onapComponentSpecMock, blueprintHelperServiceMock, dmaapService, createInputs(), true); - Map expectedMap = createExpectedMap("_feed"); + Map expectedMap = createExpectedMap("_feed"); assertNotNull(streamSubscribes); assertEquals(expectedMap.size(), streamSubscribes.size()); - for(Map.Entry entry : expectedMap.entrySet()) { + for (Map.Entry entry : expectedMap.entrySet()) { assertTrue(streamSubscribes.containsKey(entry.getKey())); - assertTrue(streamSubscribes.get(entry.getKey()).getType().equals(entry.getValue().getType())); - assertTrue(streamSubscribes.get(entry.getKey()).getDmaap_info().equals(entry.getValue().getDmaap_info())); + assertEquals(streamSubscribes.get(entry.getKey()).getType(), entry.getValue().getType()); + assertEquals(((Dmaap) streamSubscribes.get(entry.getKey())).getDmaap_info(), + ((Dmaap) entry.getValue()).getDmaap_info()); } } @@ -206,26 +210,27 @@ class StreamServiceTest { DmaapService dmaapService = new DmaapService(); - Map streamSubscribes = streamService.createStreamSubscribes( + Map streamSubscribes = streamService.createStreamSubscribes( onapComponentSpecMock, blueprintHelperServiceMock, dmaapService, createInputs(), true); - Map expectedMap = createExpectedMap("_topic"); + Map expectedMap = createExpectedMap("_topic"); assertNotNull(streamSubscribes); assertEquals(expectedMap.size(), streamSubscribes.size()); - for(Map.Entry entry : expectedMap.entrySet()) { + for (Map.Entry entry : expectedMap.entrySet()) { assertTrue(streamSubscribes.containsKey(entry.getKey())); - assertTrue(streamSubscribes.get(entry.getKey()).getType().equals(entry.getValue().getType())); - assertTrue(streamSubscribes.get(entry.getKey()).getDmaap_info().equals(entry.getValue().getDmaap_info())); + assertEquals(streamSubscribes.get(entry.getKey()).getType(), entry.getValue().getType()); + assertEquals(((Dmaap) streamSubscribes.get(entry.getKey())).getDmaap_info(), + ((Dmaap) entry.getValue()).getDmaap_info()); } } - private Map createExpectedMap(String suffix) { - Map expectedMap = new HashMap<>(); + private Map createExpectedMap(String suffix) { + Map expectedMap = new HashMap<>(); Dmaap dmaap1 = new Dmaap(); dmaap1.setType("t1"); dmaap1.setDmaap_info("<>"); @@ -260,21 +265,21 @@ class StreamServiceTest { return new Subscribes[]{sub1, sub2, sub3}; } - private Publishes createPublishes(String key, String type){ + private Publishes createPublishes(String key, String type) { Publishes publishes = new Publishes(); publishes.setConfig_key(key); publishes.setType(type); return publishes; } - private Subscribes createSubscribes(String key, String type){ + private Subscribes createSubscribes(String key, String type) { Subscribes subscribes = new Subscribes(); subscribes.setConfig_key(key); subscribes.setType(type); return subscribes; } - private Map> createInputs(){ + private Map> createInputs() { LinkedHashMap map = new LinkedHashMap<>(); map.put("key-1", "obj-1"); diff --git a/mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamServiceTest.java b/mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamServiceTest.java new file mode 100644 index 0000000..cad3b71 --- /dev/null +++ b/mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamServiceTest.java @@ -0,0 +1,125 @@ +/* + * + * * ============LICENSE_START======================================================= + * * org.onap.dcae + * * ================================================================================ + * * Copyright (c) 2021 Nokia Intellectual Property. All rights reserved. + * * ================================================================================ + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END========================================================= + * + * + */ + +package org.onap.blueprintgenerator.service.common.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.LinkedHashMap; +import java.util.Map; +import org.junit.Test; + +import org.junit.runner.RunWith; +import org.onap.blueprintgenerator.service.base.BlueprintHelperService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.ConfigFileApplicationContextInitializer; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = {KafkaStreamService.class, BlueprintHelperService.class}, + initializers = ConfigFileApplicationContextInitializer.class) +public class KafkaStreamServiceTest { + + private static final String TEST_STREAM_NAME = "test_stream_name"; + private static final String PUBLISH_URL_SUFFIX = "_publish_url"; + private static final String SUBSCRIBE_URL_SUFFIX = "_subscribe_url"; + private static final String DEFAULT_KEY = "default"; + private static final String KAFKA_TYPE = "kafka"; + + @Autowired + private KafkaStreamService kafkaStreamService; + + @Test + public void createCorrectStreamCommonInputs() { + + Map> publishInputs = kafkaStreamService + .createStreamPublishInputs("test_stream_name"); + + LinkedHashMap kafka_bootstrap_servers = publishInputs.get("kafka_bootstrap_servers"); + LinkedHashMap kafka_username = publishInputs.get("kafka_username"); + LinkedHashMap kafka_password = publishInputs.get("kafka_password"); + + assertNotNull(kafka_bootstrap_servers); + assertNotNull(kafka_username); + assertNotNull(kafka_password); + + assertNotNull(kafka_bootstrap_servers.get(DEFAULT_KEY)); + assertNotNull(kafka_username.get(DEFAULT_KEY)); + assertNotNull(kafka_password.get(DEFAULT_KEY)); + } + + @Test + public void createCorrectStreamPublishInput() { + Map> publishInputs = kafkaStreamService + .createStreamPublishInputs(TEST_STREAM_NAME); + + LinkedHashMap kafka_stream_name = publishInputs.get(TEST_STREAM_NAME + PUBLISH_URL_SUFFIX); + + assertNotNull(kafka_stream_name); + + assertNotNull(kafka_stream_name.get(DEFAULT_KEY)); + } + + @Test + public void createCorrectStreamSubscribeInput() { + Map> publishInputs = kafkaStreamService + .createStreamSubscribeInputs(TEST_STREAM_NAME); + + LinkedHashMap kafka_stream_name = publishInputs.get(TEST_STREAM_NAME + SUBSCRIBE_URL_SUFFIX); + + assertNotNull(kafka_stream_name); + + assertNotNull(kafka_stream_name.get(DEFAULT_KEY)); + } + + @Test + public void createCorrectPublishAppConfig() { + Map appPropertiesPublish = kafkaStreamService + .createAppPropertiesPublish(TEST_STREAM_NAME); + + KafkaStream kafkaStream = appPropertiesPublish.get(TEST_STREAM_NAME); + + assertEquals(KAFKA_TYPE, kafkaStream.getType()); + assertNotNull(kafkaStream.getAafCredential()); + assertNotNull(kafkaStream.getKafkaInfo()); + assertTrue(kafkaStream.getKafkaInfo().toString().contains(TEST_STREAM_NAME + PUBLISH_URL_SUFFIX)); + + } + + @Test + public void createCorrectSubscribeAppConfig() { + Map appPropertiesSubscribe = kafkaStreamService + .createAppPropertiesSubscribe(TEST_STREAM_NAME); + + KafkaStream kafkaStream = appPropertiesSubscribe.get(TEST_STREAM_NAME); + + assertEquals(KAFKA_TYPE, kafkaStream.getType()); + assertNotNull(kafkaStream.getAafCredential()); + assertNotNull(kafkaStream.getKafkaInfo()); + assertTrue(kafkaStream.getKafkaInfo().toString().contains(TEST_STREAM_NAME + SUBSCRIBE_URL_SUFFIX)); + + } +} diff --git a/mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamTest.java b/mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamTest.java new file mode 100644 index 0000000..2ba2274 --- /dev/null +++ b/mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamTest.java @@ -0,0 +1,98 @@ +/* + * + * * ============LICENSE_START======================================================= + * * org.onap.dcae + * * ================================================================================ + * * Copyright (c) 2021 Nokia Intellectual Property. All rights reserved. + * * ================================================================================ + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END========================================================= + * + * + */ + +package org.onap.blueprintgenerator.service.common.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import org.junit.Before; +import org.junit.Test; + +public class KafkaStreamTest { + + private final static String TEST_TOPIC_NAME = "test_topic"; + private static final String GET_INPUT_KAFKA_USERNAME = "{\"get_input\":\"kafka_username\"}"; + private static final String GET_INPUT_KAFKA_PASSWORD = "{\"get_input\":\"kafka_password\"}"; + private static final String AAF_USERNAME = "username"; + private static final String AAF_PASSWORD = "password"; + private static final String AAF_CREDENTIAL_NODE = "aaf_credentials"; + private static final String KAFKA_TYPE_NODE = "type"; + private static final String EXPECTED_KAFKA_TYPE = "\"kafka\""; + private static final String KAFKA_BOOTSTRAP_SERVERS = "bootstrap_servers"; + private static final String KAFKA_TOPIC_NAME = "topic_name"; + private static final String EXPECTED_GET_INPUT_TOPIC = "{\"get_input\":\"" + TEST_TOPIC_NAME + "\"}"; + private static final String EXPECTED_GET_INPUT_BOOTSTRAP_SERVERS = "{\"get_input\":\"kafka_bootstrap_servers\"}"; + private static final String KAFKA_INFO_NODE = "kafka_info"; + + private KafkaStream kafkaStream; + private ObjectMapper mapper = new ObjectMapper(); + + @Before + public void setUp() { + kafkaStream = new KafkaStream(TEST_TOPIC_NAME); + } + + @Test + public void kafkaStreamHasCorrectAafCredential() throws IOException { + + String kafkaStreamJson = mapper.writeValueAsString(kafkaStream); + + JsonNode kafkaStreamNode = mapper.readTree(kafkaStreamJson); + JsonNode aafCredential = kafkaStreamNode.get(AAF_CREDENTIAL_NODE); + + assertNotNull(aafCredential); + assertEquals(GET_INPUT_KAFKA_USERNAME, aafCredential.get(AAF_USERNAME).toString()); + assertEquals(GET_INPUT_KAFKA_PASSWORD, aafCredential.get(AAF_PASSWORD).toString()); + } + + @Test + public void kafkaStreamHasCorrectKafkaInfo() throws IOException { + + String kafkaStreamJson = mapper.writeValueAsString(kafkaStream); + + JsonNode kafkaStreamNode = mapper.readTree(kafkaStreamJson); + JsonNode kafkaInfo = kafkaStreamNode.get(KAFKA_INFO_NODE); + + assertNotNull(kafkaInfo); + assertEquals(EXPECTED_GET_INPUT_BOOTSTRAP_SERVERS, kafkaInfo.get(KAFKA_BOOTSTRAP_SERVERS).toString()); + assertEquals(EXPECTED_GET_INPUT_TOPIC, kafkaInfo.get(KAFKA_TOPIC_NAME).toString()); + + } + + @Test + public void kafkaStreamHasCorrectType() throws IOException { + + String kafkaStreamJson = mapper.writeValueAsString(kafkaStream); + + JsonNode kafkaStreamNode = mapper.readTree(kafkaStreamJson); + JsonNode kafkaType = kafkaStreamNode.get(KAFKA_TYPE_NODE); + + assertNotNull(kafkaType); + assertEquals(EXPECTED_KAFKA_TYPE, kafkaType.toString()); + } + +} -- cgit 1.2.3-korg