From fbf94a6505d7c4b7c009446ca7dca93a9ce0db4d Mon Sep 17 00:00:00 2001 From: rameshiyer27 Date: Mon, 11 Dec 2023 18:50:14 +0000 Subject: Add kafka support on drools-pdp Issue-ID: POLICY-4201 Signed-off-by: zrrmmua Change-Id: I7f9ebec13cc41c214a400087f99e62bbc895abdd --- .../main/feature/config/feature-legacy-config.properties | 12 ++++++++++++ .../src/main/feature/config/feature-lifecycle.properties | 16 ++++++++++++++++ .../controller/IndexedDroolsControllerFactory.java | 6 ++++++ .../onap/policy/drools/server/restful/RestManager.java | 6 ++++++ 4 files changed, 40 insertions(+) diff --git a/feature-legacy-config/src/main/feature/config/feature-legacy-config.properties b/feature-legacy-config/src/main/feature/config/feature-legacy-config.properties index af5bd15c..15a35b8b 100644 --- a/feature-legacy-config/src/main/feature/config/feature-legacy-config.properties +++ b/feature-legacy-config/src/main/feature/config/feature-legacy-config.properties @@ -26,3 +26,15 @@ dmaap.source.topics.PDPD-CONFIGURATION.consumerGroup=${envd:PDPD_CONFIGURATION_C dmaap.source.topics.PDPD-CONFIGURATION.consumerInstance=${envd:PDPD_CONFIGURATION_CONSUMER_INSTANCE} dmaap.source.topics.PDPD-CONFIGURATION.managed=false dmaap.source.topics.PDPD-CONFIGURATION.https=${envd:DMAAP_HTTPS:true} + +#Replace the properties with the following to use kafka message broker +#kafka.source.topics=pdpd-configuration +#kafka.source.topics.fetchTimeout=15000 +#kafka.source.topics.pdpd-configuration.servers=${envd:DMAAP_SERVERS} +#kafka.source.topics.pdpd-configuration.effectiveTopic=${envd:PDPD_CONFIGURATION_TOPIC} +#kafka.source.topics.pdpd-configuration.apiKey=${envd:PDPD_CONFIGURATION_API_KEY} +#kafka.source.topics.pdpd-configuration.apiSecret=${envd:PDPD_CONFIGURATION_API_SECRET} +#kafka.source.topics.pdpd-configuration.consumerGroup=${envd:PDPD_CONFIGURATION_CONSUMER_GROUP} +#kafka.source.topics.pdpd-configuration.consumerInstance=${envd:PDPD_CONFIGURATION_CONSUMER_INSTANCE} +#kafka.source.topics.pdpd-configuration.managed=false +#kafka.source.topics.pdpd-configuration.https=${envd:DMAAP_HTTPS:true} diff --git a/feature-lifecycle/src/main/feature/config/feature-lifecycle.properties b/feature-lifecycle/src/main/feature/config/feature-lifecycle.properties index d79c9e50..27799543 100644 --- a/feature-lifecycle/src/main/feature/config/feature-lifecycle.properties +++ b/feature-lifecycle/src/main/feature/config/feature-lifecycle.properties @@ -36,3 +36,19 @@ dmaap.sink.topics.POLICY-PDP-PAP.effectiveTopic=${envd:POLICY_PDP_PAP_TOPIC} dmaap.sink.topics.POLICY-PDP-PAP.apiKey=${envd:POLICY_PDP_PAP_API_KEY} dmaap.sink.topics.POLICY-PDP-PAP.apiSecret=${envd:POLICY_PDP_PAP_API_SECRET} dmaap.sink.topics.POLICY-PDP-PAP.https=${envd:DMAAP_HTTPS:true} + +#Replace the properties with the following to use kafka message broker +#kafka.source.topics=policy-pdp-pap +#kafka.source.topics.fetchTimeout=15000 +#kafka.sink.topics=policy-pdp-pap +#kafka.source.topics.policy-pdp-pap.servers=${envd:DMAAP_SERVERS} +#kafka.source.topics.policy-pdp-pap.effectiveTopic=${envd:POLICY_PDP_PAP_TOPIC} +#kafka.source.topics.policy-pdp-pap.apiKey=${envd:POLICY_PDP_PAP_API_KEY} +#kafka.source.topics.policy-pdp-pap.apiSecret=${envd:POLICY_PDP_PAP_API_SECRET} +#kafka.source.topics.policy-pdp-pap.https=${envd:DMAAP_HTTPS:true} + +#kafka.sink.topics.policy-pdp-pap.servers=${envd:DMAAP_SERVERS} +#kafka.sink.topics.policy-pdp-pap.effectiveTopic=${envd:POLICY_PDP_PAP_TOPIC} +#kafka.sink.topics.policy-pdp-pap.apiKey=${envd:POLICY_PDP_PAP_API_KEY} +#kafka.sink.topics.policy-pdp-pap.apiSecret=${envd:POLICY_PDP_PAP_API_SECRET} +#kafka.sink.topics.policy-pdp-pap.https=${envd:DMAAP_HTTPS:true} diff --git a/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java b/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java index 5491eac7..0c732246 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java +++ b/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java @@ -271,6 +271,12 @@ class IndexedDroolsControllerFactory implements DroolsControllerFactory { } else { return PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "."; } + } else if (commInfra == CommInfrastructure.KAFKA) { + if (isSource) { + return PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "."; + } else { + return PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "."; + } } else { throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra); } diff --git a/policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java b/policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java index b5cf319e..ad23ff3a 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java +++ b/policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java @@ -1414,6 +1414,9 @@ public class RestManager implements SwaggerApi, DefaultApi, FeaturesApi, InputsA case NOOP: sources.addAll(TopicEndpointManager.getManager().getNoopTopicSources()); break; + case KAFKA: + sources.addAll(TopicEndpointManager.getManager().getKafkaTopicSources()); + break; default: status = Status.BAD_REQUEST; logger.debug("Invalid communication mechanism"); @@ -1449,6 +1452,9 @@ public class RestManager implements SwaggerApi, DefaultApi, FeaturesApi, InputsA case NOOP: sinks.addAll(TopicEndpointManager.getManager().getNoopTopicSinks()); break; + case KAFKA: + sinks.addAll(TopicEndpointManager.getManager().getKafkaTopicSinks()); + break; default: status = Status.BAD_REQUEST; logger.debug("Invalid communication mechanism"); -- cgit 1.2.3-korg