aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2019-06-24 15:46:36 -0400
committerJim Hahn <jrh3@att.com>2019-06-26 08:55:15 -0400
commit67fcc6f6abb7904ecd4b4444fa23b355cf9fd4ae (patch)
treef5c63d96a4d337e8210f4d83c3dd14caf8e1c051 /policy-endpoints/src/main/java/org/onap
parentd1ab0ec8471deeb7739206dc2ef0aac3dc5b245f (diff)
Fix some sonar issues in policy-endpoints
Refactored various classes to reduce cyclomatic complexity. Introduced some endpoint utility classes to facilitate extraction and conversion of property values, and populating of common "builder" values. Change-Id: Ie1c91cd94cb54700dc9127f72780b4d94b82ec39 Issue-ID: POLICY-1791 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java200
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java249
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java104
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java144
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java152
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java31
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java111
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java42
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java182
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/DmaapPropertyUtils.java122
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/PropertyUtils.java135
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/UebPropertyUtils.java66
12 files changed, 712 insertions, 826 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java
index 4ccf08de..6f0753b3 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,15 +21,15 @@
package org.onap.policy.common.endpoints.event.comm.bus;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
-
+import org.apache.commons.lang3.StringUtils;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils;
+import org.onap.policy.common.endpoints.utils.PropertyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +53,7 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
@Override
public DmaapTopicSink build(BusTopicParams busTopicParams) {
- if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
+ if (StringUtils.isBlank(busTopicParams.getTopic())) {
throw new IllegalArgumentException(MISSING_TOPIC);
}
@@ -86,178 +86,44 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
public List<DmaapTopicSink> build(Properties properties) {
String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS);
- if (writeTopics == null || writeTopics.isEmpty()) {
+ if (StringUtils.isBlank(writeTopics)) {
logger.info("{}: no topic for DMaaP Sink", this);
return new ArrayList<>();
}
- List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>();
synchronized (this) {
- for (String topic : writeTopicList) {
- if (this.dmaapTopicWriters.containsKey(topic)) {
- newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
- continue;
- }
- String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
-
- List<String> serverList;
- if (servers != null && !servers.isEmpty()) {
- serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
- } else {
- serverList = new ArrayList<>();
- }
-
- final String effectiveTopic = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic);
-
- final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
- final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
-
- final String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
- final String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
-
- final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
-
- final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
-
- /* DME2 Properties */
-
- final String dme2Environment = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
-
- final String dme2AftEnvironment = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
-
- final String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
-
- final String dme2RouteOffer = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
-
- final String dme2Latitude = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
-
- final String dme2Longitude = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
-
- final String dme2EpReadTimeoutMs = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
-
- final String dme2EpConnTimeout = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
-
- final String dme2RoundtripTimeoutMs =
- properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
-
- final String dme2Version = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
-
- final String dme2SubContextPath = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
-
- final String dme2SessionStickinessRequired =
- properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS
- + "." + topic
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
-
- Map<String, String> dme2AdditionalProps = new HashMap<>();
-
- if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
- dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
- }
- if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
- dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
- }
- if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
- dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
- }
- if (dme2Version != null && !dme2Version.isEmpty()) {
- dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
- }
- if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
- dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
- }
- if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
- dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
- }
- if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
- dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
- }
-
- if (servers == null || servers.isEmpty()) {
- logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
- continue;
- }
-
- boolean managed = true;
- if (managedString != null && !managedString.isEmpty()) {
- managed = Boolean.parseBoolean(managedString);
- }
-
- String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
-
- // default is to use HTTP if no https property exists
- boolean useHttps = false;
- if (useHttpsString != null && !useHttpsString.isEmpty()) {
- useHttps = Boolean.parseBoolean(useHttpsString);
- }
-
- String allowSelfSignedCertsString =
- properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
-
- // default is to disallow self-signed certs
- boolean allowSelfSignedCerts = false;
- if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
- allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
- }
-
- DmaapTopicSink dmaapTopicSink = this.build(BusTopicParams.builder()
- .servers(serverList)
- .topic(topic)
- .effectiveTopic(effectiveTopic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .userName(aafMechId)
- .password(aafPassword)
- .partitionId(partitionKey)
- .environment(dme2Environment)
- .aftEnvironment(dme2AftEnvironment)
- .partner(dme2Partner)
- .latitude(dme2Latitude)
- .longitude(dme2Longitude)
- .additionalProps(dme2AdditionalProps)
- .managed(managed)
- .useHttps(useHttps)
- .allowSelfSignedCerts(allowSelfSignedCerts)
- .build());
-
- newDmaapTopicSinks.add(dmaapTopicSink);
+ for (String topic : writeTopics.split("\\s*,\\s*")) {
+ addTopic(newDmaapTopicSinks, properties, topic);
}
return newDmaapTopicSinks;
}
}
+ private void addTopic(List<DmaapTopicSink> newDmaapTopicSinks, Properties properties, String topic) {
+ if (this.dmaapTopicWriters.containsKey(topic)) {
+ newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
+ return;
+ }
+
+ String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic;
+
+ PropertyUtils props = new PropertyUtils(properties, topicPrefix,
+ (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
+
+ String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+ if (StringUtils.isBlank(servers)) {
+ logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
+ return;
+ }
+
+ DmaapTopicSink dmaapTopicSink = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers)
+ .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null))
+ .build());
+
+ newDmaapTopicSinks.add(dmaapTopicSink);
+ }
+
/**
* Makes a new sink.
*
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java
index ddc3321f..c895a409 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,15 +21,15 @@
package org.onap.policy.common.endpoints.event.comm.bus;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
-
+import org.apache.commons.lang3.StringUtils;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils;
+import org.onap.policy.common.endpoints.utils.PropertyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,214 +75,15 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
public List<DmaapTopicSource> build(Properties properties) {
String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
- if (readTopics == null || readTopics.isEmpty()) {
+ if (StringUtils.isBlank(readTopics)) {
logger.info("{}: no topic for DMaaP Source", this);
return new ArrayList<>();
}
- List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
synchronized (this) {
- for (String topic : readTopicList) {
- if (this.dmaapTopicSources.containsKey(topic)) {
- dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
- continue;
- }
-
- String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
-
- List<String> serverList;
- if (servers != null && !servers.isEmpty()) {
- serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
- } else {
- serverList = new ArrayList<>();
- }
-
- final String effectiveTopic = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic);
-
- final String apiKey = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
-
- final String apiSecret = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
-
- final String aafMechId = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
-
- final String aafPassword = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
-
- final String consumerGroup = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
-
- final String consumerInstance = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
-
- final String fetchTimeoutString = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
-
- /* DME2 Properties */
-
- final String dme2Environment = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
-
- final String dme2AftEnvironment = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
-
- final String dme2Partner = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
-
- final String dme2RouteOffer = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
-
- final String dme2Latitude = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
-
- final String dme2Longitude = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
-
- final String dme2EpReadTimeoutMs =
- properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
-
- final String dme2EpConnTimeout = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
-
- final String dme2RoundtripTimeoutMs =
- properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
-
- final String dme2Version = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
-
- final String dme2SubContextPath = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
-
- final String dme2SessionStickinessRequired =
- properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
-
- Map<String, String> dme2AdditionalProps = new HashMap<>();
-
- if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
- dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
- }
- if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
- dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
- }
- if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
- dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
- }
- if (dme2Version != null && !dme2Version.isEmpty()) {
- dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
- }
- if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
- dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
- }
- if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
- dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
- }
- if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
- dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
- }
-
- if (servers == null || servers.isEmpty()) {
- logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
- continue;
- }
-
- int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
- if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
- try {
- fetchTimeout = Integer.parseInt(fetchTimeoutString);
- } catch (NumberFormatException nfe) {
- logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
- topic);
- }
- }
-
- String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
- int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
- if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
- try {
- fetchLimit = Integer.parseInt(fetchLimitString);
- } catch (NumberFormatException nfe) {
- logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
- topic);
- }
- }
-
- String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
- boolean managed = true;
- if (managedString != null && !managedString.isEmpty()) {
- managed = Boolean.parseBoolean(managedString);
- }
-
- String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
-
- // default is to use HTTP if no https property exists
- boolean useHttps = false;
- if (useHttpsString != null && !useHttpsString.isEmpty()) {
- useHttps = Boolean.parseBoolean(useHttpsString);
- }
-
- String allowSelfSignedCertsString =
- properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
-
- // default is to disallow self-signed certs
- boolean allowSelfSignedCerts = false;
- if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
- allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
- }
-
-
- DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder()
- .servers(serverList)
- .topic(topic)
- .effectiveTopic(effectiveTopic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .userName(aafMechId)
- .password(aafPassword)
- .consumerGroup(consumerGroup)
- .consumerInstance(consumerInstance)
- .fetchTimeout(fetchTimeout)
- .fetchLimit(fetchLimit)
- .environment(dme2Environment)
- .aftEnvironment(dme2AftEnvironment)
- .partner(dme2Partner)
- .latitude(dme2Latitude)
- .longitude(dme2Longitude)
- .additionalProps(dme2AdditionalProps)
- .managed(managed)
- .useHttps(useHttps)
- .allowSelfSignedCerts(allowSelfSignedCerts)
- .build());
-
- dmaapTopicSourceLst.add(uebTopicSource);
+ for (String topic : readTopics.split("\\s*,\\s*")) {
+ addTopic(dmaapTopicSourceLst, properties, topic);
}
}
return dmaapTopicSourceLst;
@@ -308,9 +109,41 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
return this.build(servers, topic, null, null);
}
+ private void addTopic(List<DmaapTopicSource> dmaapTopicSourceLst, Properties properties, String topic) {
+ if (this.dmaapTopicSources.containsKey(topic)) {
+ dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
+ return;
+ }
+
+ String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic;
+
+ PropertyUtils props = new PropertyUtils(properties, topicPrefix,
+ (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
+
+ String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+ if (StringUtils.isBlank(servers)) {
+ logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
+ return;
+ }
+
+ DmaapTopicSource uebTopicSource = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers)
+ .consumerGroup(props.getString(
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null))
+ .consumerInstance(props.getString(
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
+ .fetchTimeout(props.getInteger(
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
+ DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH))
+ .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
+ DmaapTopicSource.DEFAULT_LIMIT_FETCH))
+ .build());
+
+ dmaapTopicSourceLst.add(uebTopicSource);
+ }
+
/**
* Makes a new source.
- *
+ *
* @param busTopicParams parameters to use to configure the source
* @return a new source
*/
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java
index 62437823..150a02c0 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,14 +21,15 @@
package org.onap.policy.common.endpoints.event.comm.bus;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
-
+import org.apache.commons.lang3.StringUtils;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.endpoints.utils.PropertyUtils;
+import org.onap.policy.common.endpoints.utils.UebPropertyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +56,7 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
throw new IllegalArgumentException("UEB Server(s) must be provided");
}
- if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
+ if (StringUtils.isBlank(busTopicParams.getTopic())) {
throw new IllegalArgumentException(MISSING_TOPIC);
}
@@ -91,82 +92,43 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
public List<UebTopicSink> build(Properties properties) {
String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS);
- if (writeTopics == null || writeTopics.isEmpty()) {
+ if (StringUtils.isBlank(writeTopics)) {
logger.info("{}: no topic for UEB Sink", this);
return new ArrayList<>();
}
- List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
List<UebTopicSink> newUebTopicSinks = new ArrayList<>();
synchronized (this) {
- for (String topic : writeTopicList) {
- if (this.uebTopicSinks.containsKey(topic)) {
- newUebTopicSinks.add(this.uebTopicSinks.get(topic));
- continue;
- }
-
- String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
- if (servers == null || servers.isEmpty()) {
- logger.error("{}: no UEB servers configured for sink {}", this, topic);
- continue;
- }
-
- final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
-
- final String effectiveTopic = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic);
- final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
- final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
- final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
-
- String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
- boolean managed = true;
- if (managedString != null && !managedString.isEmpty()) {
- managed = Boolean.parseBoolean(managedString);
- }
-
- String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
-
- // default is to use HTTP if no https property exists
- boolean useHttps = false;
- if (useHttpsString != null && !useHttpsString.isEmpty()) {
- useHttps = Boolean.parseBoolean(useHttpsString);
- }
-
-
- String allowSelfSignedCertsString =
- properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
-
- // default is to disallow self-signed certs
- boolean allowSelfSignedCerts = false;
- if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
- allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
- }
-
- UebTopicSink uebTopicWriter = this.build(BusTopicParams.builder()
- .servers(serverList)
- .topic(topic)
- .effectiveTopic(effectiveTopic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .partitionId(partitionKey)
- .managed(managed)
- .useHttps(useHttps)
- .allowSelfSignedCerts(allowSelfSignedCerts)
- .build());
- newUebTopicSinks.add(uebTopicWriter);
+ for (String topic : writeTopics.split("\\s*,\\s*")) {
+ addTopic(newUebTopicSinks, topic, properties);
}
return newUebTopicSinks;
}
}
+ private void addTopic(List<UebTopicSink> newUebTopicSinks, String topic, Properties properties) {
+ if (this.uebTopicSinks.containsKey(topic)) {
+ newUebTopicSinks.add(this.uebTopicSinks.get(topic));
+ return;
+ }
+
+ String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic;
+
+ PropertyUtils props = new PropertyUtils(properties, topicPrefix,
+ (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
+
+ String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+ if (StringUtils.isBlank(servers)) {
+ logger.error("{}: no UEB servers configured for sink {}", this, topic);
+ return;
+ }
+
+ UebTopicSink uebTopicWriter = this.build(UebPropertyUtils.makeBuilder(props, topic, servers)
+ .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null))
+ .build());
+ newUebTopicSinks.add(uebTopicWriter);
+ }
+
@Override
public void destroy(String topic) {
@@ -221,7 +183,7 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
/**
* Makes a new sink.
- *
+ *
* @param busTopicParams parameters to use to configure the sink
* @return a new sink
*/
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java
index f3ef8fdc..6655aa12 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,14 +21,15 @@
package org.onap.policy.common.endpoints.event.comm.bus;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
-
+import org.apache.commons.lang3.StringUtils;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.endpoints.utils.PropertyUtils;
+import org.onap.policy.common.endpoints.utils.UebPropertyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,110 +78,15 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
public List<UebTopicSource> build(Properties properties) {
String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS);
- if (readTopics == null || readTopics.isEmpty()) {
+ if (StringUtils.isBlank(readTopics)) {
logger.info("{}: no topic for UEB Source", this);
return new ArrayList<>();
}
- List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
List<UebTopicSource> newUebTopicSources = new ArrayList<>();
synchronized (this) {
- for (String topic : readTopicList) {
- if (this.uebTopicSources.containsKey(topic)) {
- newUebTopicSources.add(this.uebTopicSources.get(topic));
- continue;
- }
-
- String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
-
- if (servers == null || servers.isEmpty()) {
- logger.error("{}: no UEB servers configured for sink {}", this, topic);
- continue;
- }
-
- final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
-
- final String effectiveTopic = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic);
-
- final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
-
- final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
-
- final String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
-
- final String consumerInstance = properties.getProperty(
- PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
-
- String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
- int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
- if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
- try {
- fetchTimeout = Integer.parseInt(fetchTimeoutString);
- } catch (NumberFormatException nfe) {
- logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
- topic);
- }
- }
-
- String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
- + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
- int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH;
- if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
- try {
- fetchLimit = Integer.parseInt(fetchLimitString);
- } catch (NumberFormatException nfe) {
- logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
- topic);
- }
- }
-
- String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
- boolean managed = true;
- if (managedString != null && !managedString.isEmpty()) {
- managed = Boolean.parseBoolean(managedString);
- }
-
- String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
- + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
-
- // default is to use HTTP if no https property exists
- boolean useHttps = false;
- if (useHttpsString != null && !useHttpsString.isEmpty()) {
- useHttps = Boolean.parseBoolean(useHttpsString);
- }
-
- String allowSelfSignedCertsString =
- properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
- + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
-
- // default is to disallow self-signed certs
- boolean allowSelfSignedCerts = false;
- if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
- allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
- }
-
- UebTopicSource uebTopicSource = this.build(BusTopicParams.builder()
- .servers(serverList)
- .topic(topic)
- .effectiveTopic(effectiveTopic)
- .apiKey(apiKey)
- .apiSecret(apiSecret)
- .consumerGroup(consumerGroup)
- .consumerInstance(consumerInstance)
- .fetchTimeout(fetchTimeout)
- .fetchLimit(fetchLimit)
- .managed(managed)
- .useHttps(useHttps)
- .allowSelfSignedCerts(allowSelfSignedCerts).build());
- newUebTopicSources.add(uebTopicSource);
+ for (String topic : readTopics.split("\\s*,\\s*")) {
+ addTopic(newUebTopicSources, topic, properties);
}
}
return newUebTopicSources;
@@ -206,9 +112,41 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
return this.build(servers, topic, null, null);
}
+ private void addTopic(List<UebTopicSource> newUebTopicSources, String topic, Properties properties) {
+ if (this.uebTopicSources.containsKey(topic)) {
+ newUebTopicSources.add(this.uebTopicSources.get(topic));
+ return;
+ }
+
+ String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic;
+
+ PropertyUtils props = new PropertyUtils(properties, topicPrefix,
+ (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
+
+ String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+ if (StringUtils.isBlank(servers)) {
+ logger.error("{}: no UEB servers configured for sink {}", this, topic);
+ return;
+ }
+
+ UebTopicSource uebTopicSource = this.build(UebPropertyUtils.makeBuilder(props, topic, servers)
+ .consumerGroup(props.getString(
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null))
+ .consumerInstance(props.getString(
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
+ .fetchTimeout(props.getInteger(
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
+ UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH))
+ .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
+ UebTopicSource.DEFAULT_LIMIT_FETCH))
+ .build());
+
+ newUebTopicSources.add(uebTopicSource);
+ }
+
/**
* Makes a new source.
- *
+ *
* @param busTopicParams parameters to use to configure the source
* @return a new source
*/
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index 1c85fa97..67adf3b4 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -178,27 +178,50 @@ public interface BusPublisher {
}
+ configureProtocol(topic, protocol, servers, useHttps);
+
+ this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
+
+ this.publisher.setUsername(username);
+ this.publisher.setPassword(password);
+
+ props = new Properties();
+
+ props.setProperty("Protocol", (useHttps ? "https" : "http"));
+ props.setProperty("contenttype", "application/json");
+ props.setProperty("username", username);
+ props.setProperty("password", password);
+
+ props.setProperty("topic", topic);
+
+ this.publisher.setProps(props);
+
+ if (protocol == ProtocolTypeConstants.AAF_AUTH) {
+ this.publisher.setHost(servers.get(0));
+ }
+
+ logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
+ }
+
+ private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers,
+ boolean useHttps) {
+
if (protocol == ProtocolTypeConstants.AAF_AUTH) {
if (servers == null || servers.isEmpty()) {
throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
}
ArrayList<String> dmaapServers = new ArrayList<>();
- if (useHttps) {
- for (String server : servers) {
- dmaapServers.add(server + ":3905");
- }
-
- } else {
- for (String server : servers) {
- dmaapServers.add(server + ":3904");
- }
+ String port = useHttps ? ":3905" : ":3904";
+ for (String server : servers) {
+ dmaapServers.add(server + port);
}
this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+
} else if (protocol == ProtocolTypeConstants.DME2) {
ArrayList<String> dmaapServers = new ArrayList<>();
dmaapServers.add("0.0.0.0:3904");
@@ -206,36 +229,10 @@ public interface BusPublisher {
this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
- } else {
- throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
- }
-
- this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
-
- this.publisher.setUsername(username);
- this.publisher.setPassword(password);
-
- props = new Properties();
- if (useHttps) {
- props.setProperty("Protocol", "https");
} else {
- props.setProperty("Protocol", "http");
- }
-
- props.setProperty("contenttype", "application/json");
- props.setProperty("username", username);
- props.setProperty("password", password);
-
- props.setProperty("topic", topic);
-
- this.publisher.setProps(props);
-
- if (protocol == ProtocolTypeConstants.AAF_AUTH) {
- this.publisher.setHost(servers.get(0));
+ throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
}
-
- logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
}
@Override
@@ -300,38 +297,12 @@ public interface BusPublisher {
super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(),
busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps());
- String dme2RouteOffer = null;
- if (busTopicParams.isAdditionalPropsValid()) {
- dme2RouteOffer = busTopicParams.getAdditionalProps().get(
- DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
- }
- if (busTopicParams.isEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isAftEnvironmentInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
- }
- if (busTopicParams.isLatitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
- }
- if (busTopicParams.isLongitudeInvalid()) {
- throw parmException(busTopicParams.getTopic(),
- PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
- }
+ String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
+ ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY)
+ : null;
- if ((busTopicParams.isPartnerInvalid())
- && StringUtils.isBlank(dme2RouteOffer)) {
- throw new IllegalArgumentException(
- "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
- + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
- + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
- }
+ validateParams(busTopicParams, dme2RouteOffer);
String serviceName = busTopicParams.getServers().get(0);
@@ -366,19 +337,52 @@ public interface BusPublisher {
props.setProperty("MethodType", "POST");
if (busTopicParams.isAdditionalPropsValid()) {
- for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
-
- if (value != null) {
- props.setProperty(key, value);
- }
- }
+ addAdditionalProps(busTopicParams);
}
this.publisher.setProps(props);
}
+ private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) {
+ if (busTopicParams.isEnvironmentInvalid()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
+ }
+ if (busTopicParams.isAftEnvironmentInvalid()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
+ }
+ if (busTopicParams.isLatitudeInvalid()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
+ }
+ if (busTopicParams.isLongitudeInvalid()) {
+ throw parmException(busTopicParams.getTopic(),
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+ }
+
+ if ((busTopicParams.isPartnerInvalid())
+ && StringUtils.isBlank(dme2RouteOffer)) {
+ throw new IllegalArgumentException(
+ "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ + busTopicParams.getTopic()
+ + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
+ + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
+ + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ }
+ }
+
+ private void addAdditionalProps(BusTopicParams busTopicParams) {
+ for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (value != null) {
+ props.setProperty(key, value);
+ }
+ }
+ }
+
private IllegalArgumentException parmException(String topic, String propnm) {
return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ topic + propnm + " property for DME2 in DMaaP");
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index 98e30e27..0953465b 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -21,6 +21,7 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
+import java.io.IOException;
import java.net.MalformedURLException;
import java.util.UUID;
@@ -223,19 +224,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
public void run() {
while (this.alive) {
try {
- for (String event : this.consumer.fetch()) {
- synchronized (this) {
- this.recentEvents.add(event);
- }
-
- NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
-
- broadcast(event);
-
- if (!this.alive) {
- break;
- }
- }
+ fetchAllMessages();
} catch (Exception e) {
logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
}
@@ -244,6 +233,22 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
logger.info("{}: exiting thread", this);
}
+ private void fetchAllMessages() throws InterruptedException, IOException {
+ for (String event : this.consumer.fetch()) {
+ synchronized (this) {
+ this.recentEvents.add(event);
+ }
+
+ NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
+
+ broadcast(event);
+
+ if (!this.alive) {
+ return;
+ }
+ }
+ }
+
@Override
public boolean offer(String event) {
if (!this.alive) {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java
index 5cc0071e..c2d0e400 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,15 +23,14 @@ package org.onap.policy.common.endpoints.http.client;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
-
import org.apache.commons.lang3.StringUtils;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.http.client.internal.JerseyClient;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.endpoints.utils.PropertyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,78 +69,54 @@ class IndexedHttpClientFactory implements HttpClientFactory {
ArrayList<HttpClient> clientList = new ArrayList<>();
String clientNames = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES);
- if (clientNames == null || clientNames.isEmpty()) {
+ if (StringUtils.isBlank(clientNames)) {
return clientList;
}
- List<String> clientNameList = new ArrayList<>(Arrays.asList(clientNames.split("\\s*,\\s*")));
-
- for (String clientName : clientNameList) {
- String httpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
- + clientName + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
- boolean https = false;
- if (StringUtils.isNotBlank(httpsString)) {
- https = Boolean.parseBoolean(httpsString);
- }
-
- String hostName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
- + clientName + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX);
-
- String servicePortString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES
- + "." + clientName + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX);
- int port;
- try {
- if (servicePortString == null || servicePortString.isEmpty()) {
- continue;
- }
- port = Integer.parseInt(servicePortString);
- } catch (NumberFormatException nfe) {
- logger.error("http-client-factory: cannot parse port {}", servicePortString, nfe);
- continue;
- }
-
- String baseUrl = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
- + clientName + PolicyEndPointProperties.PROPERTY_HTTP_URL_SUFFIX);
-
- String userName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
- + clientName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX);
-
- String password = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
- + clientName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX);
-
- final String classProv = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES
- + "." + clientName + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER);
-
- String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "."
- + clientName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
- boolean managed = true;
- if (managedString != null && !managedString.isEmpty()) {
- managed = Boolean.parseBoolean(managedString);
- }
-
- try {
- HttpClient client =
- this.build(BusTopicParams.builder()
- .clientName(clientName)
- .useHttps(https)
- .allowSelfSignedCerts(https)
- .hostname(hostName)
- .port(port)
- .basePath(baseUrl)
- .userName(userName)
- .password(password)
- .managed(managed)
- .serializationProvider(classProv)
- .build());
- clientList.add(client);
- } catch (Exception e) {
- logger.error("http-client-factory: cannot build client {}", clientName, e);
- }
+ for (String clientName : clientNames.split("\\s*,\\s*")) {
+ addClient(clientList, clientName, properties);
}
return clientList;
}
+ private void addClient(ArrayList<HttpClient> clientList, String clientName, Properties properties) {
+ String clientPrefix = PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + clientName;
+
+ PropertyUtils props = new PropertyUtils(properties, clientPrefix,
+ (name, value, ex) ->
+ logger.warn("{}: {} {} is in invalid format for http client {} ", this, name, value, clientName));
+
+ int port = props.getInteger(PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX, -1);
+ if (port < 0) {
+ logger.warn("No HTTP port for client in {}", clientName);
+ return;
+ }
+
+ boolean https = props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX, false);
+
+ try {
+ HttpClient client = this.build(BusTopicParams.builder()
+ .clientName(clientName)
+ .useHttps(https)
+ .allowSelfSignedCerts(https)
+ .hostname(props.getString(PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, null))
+ .port(port)
+ .basePath(props.getString(PolicyEndPointProperties.PROPERTY_HTTP_URL_SUFFIX, null))
+ .userName(props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX,
+ null))
+ .password(props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX,
+ null))
+ .managed(props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true))
+ .serializationProvider(props.getString(
+ PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER, null))
+ .build());
+ clientList.add(client);
+ } catch (Exception e) {
+ logger.error("http-client-factory: cannot build client {}", clientName, e);
+ }
+ }
+
@Override
public synchronized HttpClient get(String name) {
if (clients.containsKey(name)) {
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java
index d4ccc494..4b73c5c4 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java
@@ -33,6 +33,7 @@ import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation.Builder;
import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
@@ -84,8 +85,6 @@ public class JerseyClient implements HttpClient {
public JerseyClient(BusTopicParams busTopicParams)
throws KeyManagementException, NoSuchAlgorithmException, ClassNotFoundException {
- super();
-
if (busTopicParams.isClientNameInvalid()) {
throw new IllegalArgumentException("Name must be provided");
}
@@ -106,10 +105,23 @@ public class JerseyClient implements HttpClient {
this.userName = busTopicParams.getUserName();
this.password = busTopicParams.getPassword();
this.selfSignedCerts = busTopicParams.isAllowSelfSignedCerts();
+ this.client = detmClient();
+
+ if (!StringUtils.isBlank(this.userName) && !StringUtils.isBlank(this.password)) {
+ HttpAuthenticationFeature authFeature = HttpAuthenticationFeature.basic(userName, password);
+ this.client.register(authFeature);
+ }
+
+ this.client.property(ClientProperties.METAINF_SERVICES_LOOKUP_DISABLE, "true");
+
+ registerSerProviders(busTopicParams.getSerializationProvider());
+
+ this.baseUrl = (this.https ? "https://" : "http://") + this.hostname + ":" + this.port + "/"
+ + (this.basePath == null ? "" : this.basePath);
+ }
- StringBuilder tmpBaseUrl = new StringBuilder();
+ private Client detmClient() throws NoSuchAlgorithmException, KeyManagementException {
if (this.https) {
- tmpBaseUrl.append("https://");
ClientBuilder clientBuilder;
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
if (this.selfSignedCerts) {
@@ -120,23 +132,11 @@ public class JerseyClient implements HttpClient {
sslContext.init(null, null, null);
clientBuilder = ClientBuilder.newBuilder().sslContext(sslContext);
}
- this.client = clientBuilder.build();
- } else {
- tmpBaseUrl.append("http://");
- this.client = ClientBuilder.newClient();
- }
+ return clientBuilder.build();
- if (this.userName != null && !this.userName.isEmpty() && this.password != null && !this.password.isEmpty()) {
- HttpAuthenticationFeature authFeature = HttpAuthenticationFeature.basic(userName, password);
- this.client.register(authFeature);
+ } else {
+ return ClientBuilder.newClient();
}
-
- registerSerProviders(busTopicParams.getSerializationProvider());
-
- this.client.property(ClientProperties.METAINF_SERVICES_LOOKUP_DISABLE, "true");
-
- this.baseUrl = tmpBaseUrl.append(this.hostname).append(":").append(this.port).append("/")
- .append((this.basePath == null) ? "" : this.basePath).toString();
}
/**
@@ -146,7 +146,7 @@ public class JerseyClient implements HttpClient {
* @throws ClassNotFoundException if the serialization provider cannot be found
*/
private void registerSerProviders(String serializationProvider) throws ClassNotFoundException {
- String providers = (serializationProvider == null || serializationProvider.isEmpty()
+ String providers = (StringUtils.isBlank(serializationProvider)
? JERSEY_DEFAULT_SERIALIZATION_PROVIDER : serializationProvider);
for (String prov : providers.split(",")) {
this.client.register(Class.forName(prov));
@@ -155,7 +155,7 @@ public class JerseyClient implements HttpClient {
@Override
public Response get(String path) {
- if (path != null && !path.isEmpty()) {
+ if (!StringUtils.isBlank(path)) {
return this.client.target(this.baseUrl).path(path).request().get();
} else {
return this.client.target(this.baseUrl).request().get();
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java
index b2c49eae..517ad208 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,12 +21,13 @@
package org.onap.policy.common.endpoints.http.server;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
import org.onap.policy.common.endpoints.http.server.internal.JettyJerseyServer;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.endpoints.utils.PropertyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,132 +76,111 @@ class IndexedHttpServletServerFactory implements HttpServletServerFactory {
ArrayList<HttpServletServer> serviceList = new ArrayList<>();
String serviceNames = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES);
- if (serviceNames == null || serviceNames.isEmpty()) {
+ if (StringUtils.isBlank(serviceNames)) {
logger.warn("No topic for HTTP Service: {}", properties);
return serviceList;
}
- List<String> serviceNameList = Arrays.asList(serviceNames.split(SPACES_COMMA_SPACES));
-
- for (String serviceName : serviceNameList) {
- String servicePortString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
- + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX);
-
- int servicePort;
- try {
- if (servicePortString == null || servicePortString.isEmpty()) {
- if (logger.isWarnEnabled()) {
- logger.warn("No HTTP port for service in {}", serviceName);
- }
- continue;
- }
- servicePort = Integer.parseInt(servicePortString);
- } catch (NumberFormatException nfe) {
- if (logger.isWarnEnabled()) {
- logger.warn("No HTTP port for service in {}", serviceName);
- }
- continue;
- }
+ for (String serviceName : serviceNames.split(SPACES_COMMA_SPACES)) {
+ addService(serviceList, serviceName, properties);
+ }
- final String hostName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
- + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX);
+ return serviceList;
+ }
- final String contextUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
- + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX);
+ private void addService(ArrayList<HttpServletServer> serviceList, String serviceName, Properties properties) {
- final String userName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
- + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX);
+ String servicePrefix = PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + serviceName;
- final String password = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
- + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX);
+ PropertyUtils props = new PropertyUtils(properties, servicePrefix,
+ (name, value, ex) -> logger
+ .warn("{}: {} {} is in invalid format for http service {} ", this, name, value, serviceName));
- final String authUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
- + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_URIPATH_SUFFIX);
+ int servicePort = props.getInteger(PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX, -1);
+ if (servicePort < 0) {
+ logger.warn("No HTTP port for service in {}", serviceName);
+ return;
+ }
- final String restClasses = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
- + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX);
+ final String hostName = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, null);
+ final String contextUriPath =
+ props.getString(PolicyEndPointProperties.PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX, null);
+ boolean managed = props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true);
+ boolean swagger = props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, false);
+ boolean https = props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX, false);
- final String filterClasses = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
- + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_FILTER_CLASSES_SUFFIX);
+ // create the service
+ HttpServletServer service = build(serviceName, https, hostName, servicePort, contextUriPath, swagger, managed);
- final String restPackages = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
- + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX);
+ // configure the service
+ setSerializationProvider(props, service);
+ setAuthentication(props, service, contextUriPath);
- final String restUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
- + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_URIPATH_SUFFIX);
+ final String restUriPath = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_REST_URIPATH_SUFFIX, null);
- final String classProv = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
- + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER);
-
- final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES
- + "." + serviceName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
- boolean managed = true;
- if (managedString != null && !managedString.isEmpty()) {
- managed = Boolean.parseBoolean(managedString);
- }
+ addFilterClasses(props, service, restUriPath);
+ addServletClasses(props, service, restUriPath);
+ addServletPackages(props, service, restUriPath);
- String swaggerString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
- + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX);
- boolean swagger = false;
- if (swaggerString != null && !swaggerString.isEmpty()) {
- swagger = Boolean.parseBoolean(swaggerString);
- }
+ serviceList.add(service);
+ }
- String httpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
- + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
- boolean https = false;
- if (httpsString != null && !httpsString.isEmpty()) {
- https = Boolean.parseBoolean(httpsString);
- }
+ private void setSerializationProvider(PropertyUtils props, HttpServletServer service) {
- String aafString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "."
- + serviceName + PolicyEndPointProperties.PROPERTY_AAF_SUFFIX);
- boolean aaf = false;
- if (aafString != null && !aafString.isEmpty()) {
- aaf = Boolean.parseBoolean(aafString);
- }
+ final String classProv = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER, null);
-
- HttpServletServer service = build(serviceName, https, hostName, servicePort, contextUriPath, swagger,
- managed);
+ if (!StringUtils.isBlank(classProv)) {
+ service.setSerializationProvider(classProv);
+ }
+ }
- if (classProv != null && !classProv.isEmpty()) {
- service.setSerializationProvider(classProv);
- }
+ private void setAuthentication(PropertyUtils props, HttpServletServer service, final String contextUriPath) {
+ /* authentication method either AAF or HTTP Basic Auth */
- /* authentication method either AAF or HTTP Basic Auth */
+ boolean aaf = props.getBoolean(PolicyEndPointProperties.PROPERTY_AAF_SUFFIX, false);
+ final String userName = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, null);
+ final String password = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, null);
+ final String authUriPath = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_URIPATH_SUFFIX, null);
- if (aaf) {
- service.setAafAuthentication(contextUriPath);
- } else if (userName != null && !userName.isEmpty() && password != null && !password.isEmpty()) {
- service.setBasicAuthentication(userName, password, authUriPath);
- }
+ if (aaf) {
+ service.setAafAuthentication(contextUriPath);
+ } else if (!StringUtils.isBlank(userName) && !StringUtils.isBlank(password)) {
+ service.setBasicAuthentication(userName, password, authUriPath);
+ }
+ }
- if (filterClasses != null && !filterClasses.isEmpty()) {
- List<String> filterClassesList = Arrays.asList(filterClasses.split(SPACES_COMMA_SPACES));
- for (String filterClass : filterClassesList) {
- service.addFilterClass(restUriPath, filterClass);
- }
- }
+ private void addFilterClasses(PropertyUtils props, HttpServletServer service, final String restUriPath) {
- if (restClasses != null && !restClasses.isEmpty()) {
- List<String> restClassesList = Arrays.asList(restClasses.split(SPACES_COMMA_SPACES));
- for (String restClass : restClassesList) {
- service.addServletClass(restUriPath, restClass);
- }
- }
+ final String filterClasses =
+ props.getString(PolicyEndPointProperties.PROPERTY_HTTP_FILTER_CLASSES_SUFFIX, null);
- if (restPackages != null && !restPackages.isEmpty()) {
- List<String> restPackageList = Arrays.asList(restPackages.split(SPACES_COMMA_SPACES));
- for (String restPackage : restPackageList) {
- service.addServletPackage(restUriPath, restPackage);
- }
+ if (!StringUtils.isBlank(filterClasses)) {
+ for (String filterClass : filterClasses.split(SPACES_COMMA_SPACES)) {
+ service.addFilterClass(restUriPath, filterClass);
}
+ }
+ }
+
+ private void addServletClasses(PropertyUtils props, HttpServletServer service, final String restUriPath) {
+
+ final String restClasses = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, null);
- serviceList.add(service);
+ if (!StringUtils.isBlank(restClasses)) {
+ for (String restClass : restClasses.split(SPACES_COMMA_SPACES)) {
+ service.addServletClass(restUriPath, restClass);
+ }
}
+ }
- return serviceList;
+ private void addServletPackages(PropertyUtils props, HttpServletServer service, final String restUriPath) {
+
+ final String restPackages = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX, null);
+
+ if (!StringUtils.isBlank(restPackages)) {
+ for (String restPackage : restPackages.split(SPACES_COMMA_SPACES)) {
+ service.addServletPackage(restUriPath, restPackage);
+ }
+ }
}
@Override
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/DmaapPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/DmaapPropertyUtils.java
new file mode 100644
index 00000000..6b44e5c8
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/DmaapPropertyUtils.java
@@ -0,0 +1,122 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.utils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSourceFactory;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+
+public class DmaapPropertyUtils {
+
+ /**
+ * Maps a topic property to a DME property.
+ */
+ private static final Map<String,String> PROP_TO_DME;
+
+ static {
+ Map<String,String> map = new HashMap<>();
+
+ map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX,
+ DmaapTopicSourceFactory.DME2_ROUTE_OFFER_PROPERTY);
+
+ map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX,
+ DmaapTopicSourceFactory.DME2_READ_TIMEOUT_PROPERTY);
+
+ map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX,
+ DmaapTopicSourceFactory.DME2_EP_CONN_TIMEOUT_PROPERTY);
+
+ map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX,
+ DmaapTopicSourceFactory.DME2_ROUNDTRIP_TIMEOUT_PROPERTY);
+
+ map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX,
+ DmaapTopicSourceFactory.DME2_VERSION_PROPERTY);
+
+ map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX,
+ DmaapTopicSourceFactory.DME2_SUBCONTEXT_PATH_PROPERTY);
+
+ map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX,
+ DmaapTopicSourceFactory.DME2_SESSION_STICKINESS_REQUIRED_PROPERTY);
+
+ PROP_TO_DME = Collections.unmodifiableMap(map);
+ }
+
+ private DmaapPropertyUtils() {
+ // do nothing
+ }
+
+ /**
+ * Makes a topic builder, configuring it with properties that are common to both
+ * sources and sinks.
+ *
+ * @param props properties to be used to configure the builder
+ * @param topic topic being configured
+ * @param servers target servers
+ * @return a topic builder
+ */
+ public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) {
+
+ /* Additional DME2 Properties */
+
+ Map<String, String> dme2AdditionalProps = new HashMap<>();
+
+ for (Map.Entry<String, String> ent : PROP_TO_DME.entrySet()) {
+ String propName = ent.getKey();
+ String value = props.getString(propName, null);
+
+ if (!StringUtils.isBlank(value)) {
+ String dmeName = ent.getValue();
+ dme2AdditionalProps.put(dmeName, value);
+ }
+ }
+
+ final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
+
+ return BusTopicParams.builder()
+ .servers(serverList)
+ .topic(topic)
+ .effectiveTopic(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX,
+ topic))
+ .apiKey(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX, null))
+ .apiSecret(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX, null))
+ .userName(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX, null))
+ .password(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX, null))
+ .environment(props.getString(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX,
+ null))
+ .aftEnvironment(props.getString(
+ PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX, null))
+ .partner(props.getString(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX, null))
+ .latitude(props.getString(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX, null))
+ .longitude(props.getString(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX, null))
+ .additionalProps(dme2AdditionalProps)
+ .managed(props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true))
+ .useHttps(props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX, false))
+ .allowSelfSignedCerts(props.getBoolean(
+ PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX, false));
+ }
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/PropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/PropertyUtils.java
new file mode 100644
index 00000000..265346c9
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/PropertyUtils.java
@@ -0,0 +1,135 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.utils;
+
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Utilities for extracting property values and converting them to other types.
+ */
+public class PropertyUtils {
+ /**
+ * Properties on which to work.
+ */
+ private Properties properties;
+
+ /**
+ * Prefix to prepend to property names.
+ */
+ private String prefix;
+
+ /**
+ * Function to invoke if a property value is invalid.
+ */
+ private TriConsumer<String, String, Exception> invalidHandler;
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param properties properties on which to work
+ * @param prefix prefix to prepend to property names
+ * @param invalidHandler function to invoke if a property value is invalid
+ */
+ public PropertyUtils(Properties properties, String prefix, TriConsumer<String, String, Exception> invalidHandler) {
+ this.properties = properties;
+ this.prefix = prefix;
+ this.invalidHandler = invalidHandler;
+ }
+
+ /**
+ * Gets a string property.
+ *
+ * @param propName name of the property whose value is to be retrieved
+ * @param defaultValue value to use if the property value is empty or does not exist
+ * @return the property's value
+ */
+ public String getString(String propName, String defaultValue) {
+ String propValue = getProperty(propName);
+ return (StringUtils.isBlank(propValue) ? defaultValue : propValue);
+ }
+
+ /**
+ * Gets a boolean property.
+ *
+ * @param propName name of the property whose value is to be retrieved
+ * @param defaultValue value to use if the property value is empty or does not exist
+ * @return the property's value
+ */
+ public boolean getBoolean(String propName, boolean defaultValue) {
+ String propValue = getProperty(propName);
+
+ if (!StringUtils.isBlank(propValue)) {
+ return Boolean.parseBoolean(propValue);
+ }
+
+ return defaultValue;
+ }
+
+ /**
+ * Gets an integer property.
+ *
+ * @param propName name of the property whose value is to be retrieved
+ * @param defaultValue value to use if the property value is empty or does not exist
+ * @return the property's value
+ */
+ public int getInteger(String propName, int defaultValue) {
+ String propValue = getProperty(propName);
+
+ if (!StringUtils.isBlank(propValue)) {
+ try {
+ return Integer.parseInt(propValue);
+
+ } catch (NumberFormatException nfe) {
+ invalidHandler.accept(getFullName(propName), propValue, nfe);
+ }
+ }
+
+ return defaultValue;
+ }
+
+
+ /**
+ * Gets a property's value.
+ *
+ * @param propName name of the property whose value is to be retrieved
+ * @return the property's value, or {@code null} if it does not exist
+ */
+ private String getProperty(String propName) {
+ return properties.getProperty(getFullName(propName));
+ }
+
+ /**
+ * Gets the full property name, with the prefix prepended.
+ *
+ * @param propName property name, without the prefix
+ * @return the full property name
+ */
+ private String getFullName(String propName) {
+ return prefix + propName;
+ }
+
+ @FunctionalInterface
+ public static interface TriConsumer<A,B,C> {
+ public void accept(A propName, B propValue, C exception);
+ }
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/UebPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/UebPropertyUtils.java
new file mode 100644
index 00000000..d0217518
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/UebPropertyUtils.java
@@ -0,0 +1,66 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.utils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+
+public class UebPropertyUtils {
+
+ private UebPropertyUtils() {
+ // do nothing
+ }
+
+ /**
+ * Makes a topic builder, configuring it with properties that are common to both
+ * sources and sinks.
+ *
+ * @param props properties to be used to configure the builder
+ * @param topic topic being configured
+ * @param servers target servers
+ * @return a topic builder
+ */
+ public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) {
+
+ final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
+
+ return BusTopicParams.builder()
+ .servers(serverList)
+ .topic(topic)
+ .effectiveTopic(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX,
+ topic))
+ .apiKey(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX, null))
+ .apiSecret(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX, null))
+ .consumerGroup(props.getString(
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null))
+ .consumerInstance(props.getString(
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
+ .managed(props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true))
+ .useHttps(props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX, false))
+ .allowSelfSignedCerts(props.getBoolean(
+ PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX,
+ false));
+ }
+}