From 609de0787201d7b6f7fcc994442bb70a87ec6256 Mon Sep 17 00:00:00 2001 From: "adheli.tavares" Date: Mon, 26 Aug 2024 14:12:46 +0100 Subject: Read additionalProps from .properties files used in drools Issue-ID: POLICY-5120 Change-Id: I2cb5cae3e6456f32a72e4b2bda0819b3eb77653e Signed-off-by: adheli.tavares --- .../properties/PolicyEndPointProperties.java | 1 + .../common/endpoints/utils/KafkaPropertyUtils.java | 42 +++++++++++++---- .../event/comm/bus/KafkaTopicPropertyBuilder.java | 28 +++++++++-- .../event/comm/bus/KafkaTopicSinkFactoryTest.java | 9 ++++ .../endpoints/utils/KafkaPropertyUtilsTest.java | 55 ++++++++++++++++++++++ 5 files changed, 120 insertions(+), 15 deletions(-) create mode 100644 policy-endpoints/src/test/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtilsTest.java (limited to 'policy-endpoints') diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java index 4295a0a1..5d36a313 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java @@ -43,6 +43,7 @@ public final class PolicyEndPointProperties { public static final String PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX = ".fetchTimeout"; public static final String PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX = ".fetchLimit"; public static final String PROPERTY_MANAGED_SUFFIX = ".managed"; + public static final String PROPERTY_ADDITIONAL_PROPS_SUFFIX = ".additionalProps"; public static final String PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX = ".partitionKey"; diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java index 03e20762..2e137ce7 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java @@ -3,7 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2022-2023 Nordix Foundation. + * Modifications Copyright (C) 2022-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,15 +21,22 @@ package org.onap.policy.common.endpoints.utils; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ADDITIONAL_PROPS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX; + +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.re2j.Pattern; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; @NoArgsConstructor(access = AccessLevel.PRIVATE) public class KafkaPropertyUtils { @@ -39,20 +46,35 @@ public class KafkaPropertyUtils { * Makes a topic builder, configuring it with properties that are common to both * sources and sinks. * - * @param props properties to be used to configure the builder - * @param topic topic being configured + * @param props properties to be used to configure the builder + * @param topic topic being configured * @param servers target servers * @return a topic builder */ public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) { final List serverList = new ArrayList<>(Arrays.asList(COMMA_SPACE_PAT.split(servers))); - return BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .effectiveTopic(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, - topic)) - .managed(props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true)); + .servers(serverList) + .topic(topic) + .effectiveTopic(props.getString(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic)) + .managed(props.getBoolean(PROPERTY_MANAGED_SUFFIX, true)) + .additionalProps(getAdditionalProps(props.getString(PROPERTY_ADDITIONAL_PROPS_SUFFIX, ""))); + } + + private static Map getAdditionalProps(String additionalPropsString) { + try { + Map additionalProps = new HashMap<>(); + var converted = new ObjectMapper().readValue(additionalPropsString, Map.class); + converted.forEach((k, v) -> { + if (k instanceof String key && v instanceof String value) { + additionalProps.put(key, value); + } + }); + return additionalProps; + } catch (Exception e) { + return Collections.emptyMap(); + } + } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java index a00879c1..b49f58e2 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation. + * Copyright (C) 2022, 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,17 +26,25 @@ import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperti import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX; import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX; -import java.util.Arrays; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Collections; +import java.util.List; +import java.util.Map; import lombok.Getter; import org.onap.policy.common.endpoints.parameters.TopicParameters; +@Getter public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder { public static final String SERVER = "localhost:9092"; public static final String TOPIC2 = "my-topic-2"; + public static final String ADDITIONAL_PROPS = "{\"security.protocol\": \"SASL_PLAINTEXT\"," + + "\"sasl.mechanism\": \"SCRAM-SHA-512\",\"sasl.jaas.config\": " + + "\"org.apache.kafka.common.security.plain.PlainLoginModule " + + "required username=abc password=abc serviceName=kafka;\"}"; - @Getter - private TopicParameters params = new TopicParameters(); + private final TopicParameters params = new TopicParameters(); /** * Constructs the object. @@ -61,6 +69,7 @@ public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder { setTopicProperty(PROPERTY_HTTP_HTTPS_SUFFIX, "true"); setTopicProperty(PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, MY_PARTITION); setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER); + setTopicProperty(".additionalProps", ADDITIONAL_PROPS); params.setTopicCommInfrastructure("kafka"); params.setTopic(topic); @@ -68,8 +77,17 @@ public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder { params.setManaged(true); params.setUseHttps(true); params.setPartitionId(MY_PARTITION); - params.setServers(Arrays.asList(SERVER)); + params.setServers(List.of(SERVER)); + params.setAdditionalProps(getAdditionalProps()); return this; } + + private Map getAdditionalProps() { + try { + return new ObjectMapper().readValue(ADDITIONAL_PROPS, Map.class); + } catch (JsonProcessingException e) { + return Collections.emptyMap(); + } + } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java index 6aed6cdf..5ff6782f 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java @@ -22,6 +22,7 @@ package org.onap.policy.common.endpoints.event.comm.bus; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS; import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX; @@ -88,6 +89,7 @@ class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase topics2 = buildTopics(makePropBuilder().makeTopic(TOPIC3) .removeTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX).build()); @@ -100,6 +102,13 @@ class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase