diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java | 249 |
1 files changed, 41 insertions, 208 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java index ddc3321f..c895a409 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,15 +21,15 @@ package org.onap.policy.common.endpoints.event.comm.bus; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; - +import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils; +import org.onap.policy.common.endpoints.utils.PropertyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,214 +75,15 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { public List<DmaapTopicSource> build(Properties properties) { String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS); - if (readTopics == null || readTopics.isEmpty()) { + if (StringUtils.isBlank(readTopics)) { logger.info("{}: no topic for DMaaP Source", this); return new ArrayList<>(); } - List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>(); synchronized (this) { - for (String topic : readTopicList) { - if (this.dmaapTopicSources.containsKey(topic)) { - dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - List<String> serverList; - if (servers != null && !servers.isEmpty()) { - serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - } else { - serverList = new ArrayList<>(); - } - - final String effectiveTopic = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic); - - final String apiKey = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - - final String apiSecret = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - - final String aafMechId = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); - - final String aafPassword = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); - - final String consumerGroup = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); - - final String consumerInstance = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); - - final String fetchTimeoutString = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); - - /* DME2 Properties */ - - final String dme2Environment = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - - final String dme2AftEnvironment = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - - final String dme2Partner = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); - - final String dme2RouteOffer = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); - - final String dme2Latitude = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - - final String dme2Longitude = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - - final String dme2EpReadTimeoutMs = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); - - final String dme2EpConnTimeout = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); - - final String dme2RoundtripTimeoutMs = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); - - final String dme2Version = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); - - final String dme2SubContextPath = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); - - final String dme2SessionStickinessRequired = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); - - Map<String, String> dme2AdditionalProps = new HashMap<>(); - - if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); - } - if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) { - dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); - } - if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); - } - if (dme2Version != null && !dme2Version.isEmpty()) { - dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); - } - if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) { - dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); - } - if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) { - dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); - } - - if (servers == null || servers.isEmpty()) { - logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); - continue; - } - - int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH; - if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { - try { - fetchTimeout = Integer.parseInt(fetchTimeoutString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString, - topic); - } - } - - String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); - int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH; - if (fetchLimitString != null && !fetchLimitString.isEmpty()) { - try { - fetchLimit = Integer.parseInt(fetchLimitString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString, - topic); - } - } - - String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - String allowSelfSignedCertsString = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - - DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .effectiveTopic(effectiveTopic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .userName(aafMechId) - .password(aafPassword) - .consumerGroup(consumerGroup) - .consumerInstance(consumerInstance) - .fetchTimeout(fetchTimeout) - .fetchLimit(fetchLimit) - .environment(dme2Environment) - .aftEnvironment(dme2AftEnvironment) - .partner(dme2Partner) - .latitude(dme2Latitude) - .longitude(dme2Longitude) - .additionalProps(dme2AdditionalProps) - .managed(managed) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - - dmaapTopicSourceLst.add(uebTopicSource); + for (String topic : readTopics.split("\\s*,\\s*")) { + addTopic(dmaapTopicSourceLst, properties, topic); } } return dmaapTopicSourceLst; @@ -308,9 +109,41 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { return this.build(servers, topic, null, null); } + private void addTopic(List<DmaapTopicSource> dmaapTopicSourceLst, Properties properties, String topic) { + if (this.dmaapTopicSources.containsKey(topic)) { + dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic)); + return; + } + + String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic; + + PropertyUtils props = new PropertyUtils(properties, topicPrefix, + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); + + String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (StringUtils.isBlank(servers)) { + logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); + return; + } + + DmaapTopicSource uebTopicSource = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers) + .consumerGroup(props.getString( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null)) + .consumerInstance(props.getString( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null)) + .fetchTimeout(props.getInteger( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, + DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH)) + .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, + DmaapTopicSource.DEFAULT_LIMIT_FETCH)) + .build()); + + dmaapTopicSourceLst.add(uebTopicSource); + } + /** * Makes a new source. - * + * * @param busTopicParams parameters to use to configure the source * @return a new source */ |