diff options
5 files changed, 120 insertions, 15 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java index 4295a0a1..5d36a313 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java @@ -43,6 +43,7 @@ public final class PolicyEndPointProperties { public static final String PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX = ".fetchTimeout"; public static final String PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX = ".fetchLimit"; public static final String PROPERTY_MANAGED_SUFFIX = ".managed"; + public static final String PROPERTY_ADDITIONAL_PROPS_SUFFIX = ".additionalProps"; public static final String PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX = ".partitionKey"; diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java index 03e20762..2e137ce7 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java @@ -3,7 +3,7 @@ * ONAP * ================================================================================ * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved. - * Modifications Copyright (C) 2022-2023 Nordix Foundation. + * Modifications Copyright (C) 2022-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,15 +21,22 @@ package org.onap.policy.common.endpoints.utils; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ADDITIONAL_PROPS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX; + +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.re2j.Pattern; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; @NoArgsConstructor(access = AccessLevel.PRIVATE) public class KafkaPropertyUtils { @@ -39,20 +46,35 @@ public class KafkaPropertyUtils { * Makes a topic builder, configuring it with properties that are common to both * sources and sinks. * - * @param props properties to be used to configure the builder - * @param topic topic being configured + * @param props properties to be used to configure the builder + * @param topic topic being configured * @param servers target servers * @return a topic builder */ public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) { final List<String> serverList = new ArrayList<>(Arrays.asList(COMMA_SPACE_PAT.split(servers))); - return BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .effectiveTopic(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, - topic)) - .managed(props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true)); + .servers(serverList) + .topic(topic) + .effectiveTopic(props.getString(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic)) + .managed(props.getBoolean(PROPERTY_MANAGED_SUFFIX, true)) + .additionalProps(getAdditionalProps(props.getString(PROPERTY_ADDITIONAL_PROPS_SUFFIX, ""))); + } + + private static Map<String, String> getAdditionalProps(String additionalPropsString) { + try { + Map<String, String> additionalProps = new HashMap<>(); + var converted = new ObjectMapper().readValue(additionalPropsString, Map.class); + converted.forEach((k, v) -> { + if (k instanceof String key && v instanceof String value) { + additionalProps.put(key, value); + } + }); + return additionalProps; + } catch (Exception e) { + return Collections.emptyMap(); + } + } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java index a00879c1..b49f58e2 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation. + * Copyright (C) 2022, 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,17 +26,25 @@ import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperti import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX; import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX; -import java.util.Arrays; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Collections; +import java.util.List; +import java.util.Map; import lombok.Getter; import org.onap.policy.common.endpoints.parameters.TopicParameters; +@Getter public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder { public static final String SERVER = "localhost:9092"; public static final String TOPIC2 = "my-topic-2"; + public static final String ADDITIONAL_PROPS = "{\"security.protocol\": \"SASL_PLAINTEXT\"," + + "\"sasl.mechanism\": \"SCRAM-SHA-512\",\"sasl.jaas.config\": " + + "\"org.apache.kafka.common.security.plain.PlainLoginModule " + + "required username=abc password=abc serviceName=kafka;\"}"; - @Getter - private TopicParameters params = new TopicParameters(); + private final TopicParameters params = new TopicParameters(); /** * Constructs the object. @@ -61,6 +69,7 @@ public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder { setTopicProperty(PROPERTY_HTTP_HTTPS_SUFFIX, "true"); setTopicProperty(PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, MY_PARTITION); setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER); + setTopicProperty(".additionalProps", ADDITIONAL_PROPS); params.setTopicCommInfrastructure("kafka"); params.setTopic(topic); @@ -68,8 +77,17 @@ public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder { params.setManaged(true); params.setUseHttps(true); params.setPartitionId(MY_PARTITION); - params.setServers(Arrays.asList(SERVER)); + params.setServers(List.of(SERVER)); + params.setAdditionalProps(getAdditionalProps()); return this; } + + private Map<String, String> getAdditionalProps() { + try { + return new ObjectMapper().readValue(ADDITIONAL_PROPS, Map.class); + } catch (JsonProcessingException e) { + return Collections.emptyMap(); + } + } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java index 6aed6cdf..5ff6782f 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java @@ -22,6 +22,7 @@ package org.onap.policy.common.endpoints.event.comm.bus; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS; import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX; @@ -88,6 +89,7 @@ class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTopicSink assertEquals(MY_TOPIC, params.getTopic()); assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic()); assertEquals(MY_PARTITION, params.getPartitionId()); + assertNotNull(params.getAdditionalProps()); List<KafkaTopicSink> topics2 = buildTopics(makePropBuilder().makeTopic(TOPIC3) .removeTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX).build()); @@ -101,6 +103,13 @@ class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTopicSink } @Test + void testBuildFromProperties() { + Properties props = makePropBuilder().makeTopic(MY_TOPIC).build(); + var listTopic = factory.build(props); + assertNotNull(listTopic); + } + + @Test @Override void testDestroyString_testGet_testInventory() { super.testDestroyString_testGet_testInventory(); diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtilsTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtilsTest.java new file mode 100644 index 00000000..52caa470 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtilsTest.java @@ -0,0 +1,55 @@ +/*- + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.utils; + +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ADDITIONAL_PROPS_SUFFIX; + +import java.util.Properties; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class KafkaPropertyUtilsTest { + + @Test + void test() { + var properties = new Properties(); + properties.setProperty("mytopic" + PROPERTY_ADDITIONAL_PROPS_SUFFIX, "{444-"); + PropertyUtils props = new PropertyUtils(properties, "mytopic", null); + + var build = KafkaPropertyUtils.makeBuilder(props, "mytopic", "servers").build(); + Assertions.assertTrue(build.getAdditionalProps().isEmpty()); + + properties.setProperty("mytopic" + PROPERTY_ADDITIONAL_PROPS_SUFFIX, + "{\"security.protocol\": \"SASL_PLAINTEXT\"}"); + build = KafkaPropertyUtils.makeBuilder(props, "mytopic", "servers").build(); + Assertions.assertTrue(build.getAdditionalProps().containsKey("security.protocol")); + + properties.setProperty("mytopic" + PROPERTY_ADDITIONAL_PROPS_SUFFIX, + "{\"security.protocol\": false }"); + build = KafkaPropertyUtils.makeBuilder(props, "mytopic", "servers").build(); + Assertions.assertTrue(build.getAdditionalProps().isEmpty()); + + properties.setProperty("mytopic" + PROPERTY_ADDITIONAL_PROPS_SUFFIX, ""); + build = KafkaPropertyUtils.makeBuilder(props, "mytopic", "servers").build(); + Assertions.assertTrue(build.getAdditionalProps().isEmpty()); + } + +}
\ No newline at end of file |