aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java1
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java42
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java28
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java9
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtilsTest.java55
5 files changed, 120 insertions, 15 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java
index 4295a0a1..5d36a313 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java
@@ -43,6 +43,7 @@ public final class PolicyEndPointProperties {
public static final String PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX = ".fetchTimeout";
public static final String PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX = ".fetchLimit";
public static final String PROPERTY_MANAGED_SUFFIX = ".managed";
+ public static final String PROPERTY_ADDITIONAL_PROPS_SUFFIX = ".additionalProps";
public static final String PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX = ".partitionKey";
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java
index 03e20762..2e137ce7 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java
@@ -3,7 +3,7 @@
* ONAP
* ================================================================================
* Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2022-2023 Nordix Foundation.
+ * Modifications Copyright (C) 2022-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,15 +21,22 @@
package org.onap.policy.common.endpoints.utils;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ADDITIONAL_PROPS_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.re2j.Pattern;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class KafkaPropertyUtils {
@@ -39,20 +46,35 @@ public class KafkaPropertyUtils {
* Makes a topic builder, configuring it with properties that are common to both
* sources and sinks.
*
- * @param props properties to be used to configure the builder
- * @param topic topic being configured
+ * @param props properties to be used to configure the builder
+ * @param topic topic being configured
* @param servers target servers
* @return a topic builder
*/
public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) {
final List<String> serverList = new ArrayList<>(Arrays.asList(COMMA_SPACE_PAT.split(servers)));
-
return BusTopicParams.builder()
- .servers(serverList)
- .topic(topic)
- .effectiveTopic(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX,
- topic))
- .managed(props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true));
+ .servers(serverList)
+ .topic(topic)
+ .effectiveTopic(props.getString(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic))
+ .managed(props.getBoolean(PROPERTY_MANAGED_SUFFIX, true))
+ .additionalProps(getAdditionalProps(props.getString(PROPERTY_ADDITIONAL_PROPS_SUFFIX, "")));
+ }
+
+ private static Map<String, String> getAdditionalProps(String additionalPropsString) {
+ try {
+ Map<String, String> additionalProps = new HashMap<>();
+ var converted = new ObjectMapper().readValue(additionalPropsString, Map.class);
+ converted.forEach((k, v) -> {
+ if (k instanceof String key && v instanceof String value) {
+ additionalProps.put(key, value);
+ }
+ });
+ return additionalProps;
+ } catch (Exception e) {
+ return Collections.emptyMap();
+ }
+
}
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java
index a00879c1..b49f58e2 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022, 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,17 +26,25 @@ import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperti
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX;
-import java.util.Arrays;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import lombok.Getter;
import org.onap.policy.common.endpoints.parameters.TopicParameters;
+@Getter
public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder {
public static final String SERVER = "localhost:9092";
public static final String TOPIC2 = "my-topic-2";
+ public static final String ADDITIONAL_PROPS = "{\"security.protocol\": \"SASL_PLAINTEXT\","
+ + "\"sasl.mechanism\": \"SCRAM-SHA-512\",\"sasl.jaas.config\": "
+ + "\"org.apache.kafka.common.security.plain.PlainLoginModule "
+ + "required username=abc password=abc serviceName=kafka;\"}";
- @Getter
- private TopicParameters params = new TopicParameters();
+ private final TopicParameters params = new TopicParameters();
/**
* Constructs the object.
@@ -61,6 +69,7 @@ public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder {
setTopicProperty(PROPERTY_HTTP_HTTPS_SUFFIX, "true");
setTopicProperty(PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, MY_PARTITION);
setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
+ setTopicProperty(".additionalProps", ADDITIONAL_PROPS);
params.setTopicCommInfrastructure("kafka");
params.setTopic(topic);
@@ -68,8 +77,17 @@ public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder {
params.setManaged(true);
params.setUseHttps(true);
params.setPartitionId(MY_PARTITION);
- params.setServers(Arrays.asList(SERVER));
+ params.setServers(List.of(SERVER));
+ params.setAdditionalProps(getAdditionalProps());
return this;
}
+
+ private Map<String, String> getAdditionalProps() {
+ try {
+ return new ObjectMapper().readValue(ADDITIONAL_PROPS, Map.class);
+ } catch (JsonProcessingException e) {
+ return Collections.emptyMap();
+ }
+ }
}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java
index 6aed6cdf..5ff6782f 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java
@@ -22,6 +22,7 @@ package org.onap.policy.common.endpoints.event.comm.bus;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS;
import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
@@ -88,6 +89,7 @@ class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTopicSink
assertEquals(MY_TOPIC, params.getTopic());
assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
assertEquals(MY_PARTITION, params.getPartitionId());
+ assertNotNull(params.getAdditionalProps());
List<KafkaTopicSink> topics2 = buildTopics(makePropBuilder().makeTopic(TOPIC3)
.removeTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX).build());
@@ -101,6 +103,13 @@ class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTopicSink
}
@Test
+ void testBuildFromProperties() {
+ Properties props = makePropBuilder().makeTopic(MY_TOPIC).build();
+ var listTopic = factory.build(props);
+ assertNotNull(listTopic);
+ }
+
+ @Test
@Override
void testDestroyString_testGet_testInventory() {
super.testDestroyString_testGet_testInventory();
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtilsTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtilsTest.java
new file mode 100644
index 00000000..52caa470
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtilsTest.java
@@ -0,0 +1,55 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.utils;
+
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ADDITIONAL_PROPS_SUFFIX;
+
+import java.util.Properties;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class KafkaPropertyUtilsTest {
+
+ @Test
+ void test() {
+ var properties = new Properties();
+ properties.setProperty("mytopic" + PROPERTY_ADDITIONAL_PROPS_SUFFIX, "{444-");
+ PropertyUtils props = new PropertyUtils(properties, "mytopic", null);
+
+ var build = KafkaPropertyUtils.makeBuilder(props, "mytopic", "servers").build();
+ Assertions.assertTrue(build.getAdditionalProps().isEmpty());
+
+ properties.setProperty("mytopic" + PROPERTY_ADDITIONAL_PROPS_SUFFIX,
+ "{\"security.protocol\": \"SASL_PLAINTEXT\"}");
+ build = KafkaPropertyUtils.makeBuilder(props, "mytopic", "servers").build();
+ Assertions.assertTrue(build.getAdditionalProps().containsKey("security.protocol"));
+
+ properties.setProperty("mytopic" + PROPERTY_ADDITIONAL_PROPS_SUFFIX,
+ "{\"security.protocol\": false }");
+ build = KafkaPropertyUtils.makeBuilder(props, "mytopic", "servers").build();
+ Assertions.assertTrue(build.getAdditionalProps().isEmpty());
+
+ properties.setProperty("mytopic" + PROPERTY_ADDITIONAL_PROPS_SUFFIX, "");
+ build = KafkaPropertyUtils.makeBuilder(props, "mytopic", "servers").build();
+ Assertions.assertTrue(build.getAdditionalProps().isEmpty());
+ }
+
+} \ No newline at end of file