From 52b5f5276add6116a14ba671c35ff9d87c140421 Mon Sep 17 00:00:00 2001 From: Tomasz Wrobel Date: Fri, 15 Jan 2021 14:34:18 +0100 Subject: Add Native Kafka streams support in bp-generator Issue-ID: DCAEGEN2-1179 Signed-off-by: Tomasz Wrobel Change-Id: I541dca959707a41c56205e20c9f5a56ccec5ca41 --- .../service/common/StreamService.java | 42 ++++++++++++++-------- 1 file changed, 28 insertions(+), 14 deletions(-) (limited to 'mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java') diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java index bd4cf87..f27ea48 100644 --- a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java +++ b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java @@ -29,39 +29,45 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.TreeMap; import org.onap.blueprintgenerator.constants.Constants; +import org.onap.blueprintgenerator.model.common.BaseStream; import org.onap.blueprintgenerator.model.common.Dmaap; import org.onap.blueprintgenerator.model.componentspec.OnapComponentSpec; import org.onap.blueprintgenerator.model.componentspec.common.Publishes; import org.onap.blueprintgenerator.model.componentspec.common.Subscribes; import org.onap.blueprintgenerator.service.base.BlueprintHelperService; +import org.onap.blueprintgenerator.service.common.kafka.KafkaStreamService; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @author : Joanna Jeremicz - * @date 01/15/2021 Application: ONAP - Blueprint Generator Common ONAP Service - * to create publishes and subscribes streams + * @date 01/15/2021 Application: ONAP - Blueprint Generator Common ONAP Service to create publishes and subscribes + * streams */ @Service("streamService") public class StreamService { + @Autowired + private KafkaStreamService kafkaStreamsService; + /** * Creates publishes stream for given Inputs and ComponentSpec * - * @param onapComponentSpec Onap Component Specification + * @param onapComponentSpec Onap Component Specification * @param blueprintHelperService Blueprint Helper Service - * @param dmaapService Dmaap Service - * @param inputs Inputs - * @param isDmaap Dmaap Argument + * @param dmaapService Dmaap Service + * @param inputs Inputs + * @param isDmaap Dmaap Argument * @return */ - public Map createStreamPublishes( + public Map createStreamPublishes( OnapComponentSpec onapComponentSpec, BlueprintHelperService blueprintHelperService, DmaapService dmaapService, Map> inputs, boolean isDmaap) { - Map streamPublishes = new TreeMap<>(); + Map streamPublishes = new TreeMap<>(); if (onapComponentSpec.getStreams() == null || onapComponentSpec.getStreams().getPublishes() == null) { return streamPublishes; } @@ -90,6 +96,9 @@ public class StreamService { Dmaap dmaap = (Dmaap) dmaapDataRouterResponse.get("dmaap"); dmaap.setType(publishes.getType()); streamPublishes.put(config, dmaap); + } else if (blueprintHelperService.isKafkaStreamType(publishes.getType())) { + inputs.putAll(kafkaStreamsService.createStreamPublishInputs(publishes.getConfig_key())); + streamPublishes.putAll(kafkaStreamsService.createAppPropertiesPublish(publishes.getConfig_key())); } } return streamPublishes; @@ -98,21 +107,21 @@ public class StreamService { /** * Creates subscribes stream for given Inputs and ComponentSpec * - * @param onapComponentSpec Onap Component Specification + * @param onapComponentSpec Onap Component Specification * @param blueprintHelperService Blueprint Helper Service - * @param dmaapService Dmaap Service - * @param inputs Inputs - * @param isDmaap Dmaap Argument + * @param dmaapService Dmaap Service + * @param inputs Inputs + * @param isDmaap Dmaap Argument * @return */ - public Map createStreamSubscribes( + public Map createStreamSubscribes( OnapComponentSpec onapComponentSpec, BlueprintHelperService blueprintHelperService, DmaapService dmaapService, Map> inputs, boolean isDmaap) { - Map streamSubscribes = new TreeMap<>(); + Map streamSubscribes = new TreeMap<>(); if (onapComponentSpec.getStreams() == null || onapComponentSpec.getStreams().getSubscribes() == null) { return streamSubscribes; } @@ -141,7 +150,12 @@ public class StreamService { Dmaap dmaap = (Dmaap) dmaapDataRouterResponse.get("dmaap"); dmaap.setType(subscribes.getType()); streamSubscribes.put(config, dmaap); + } else if (blueprintHelperService.isKafkaStreamType(subscribes.getType())) { + inputs.putAll(kafkaStreamsService.createStreamSubscribeInputs(subscribes.getConfig_key())); + streamSubscribes.putAll(kafkaStreamsService.createAppPropertiesSubscribe(subscribes.getConfig_key())); } + + } return streamSubscribes; } -- cgit 1.2.3-korg