From 67fcc6f6abb7904ecd4b4444fa23b355cf9fd4ae Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Mon, 24 Jun 2019 15:46:36 -0400 Subject: Fix some sonar issues in policy-endpoints Refactored various classes to reduce cyclomatic complexity. Introduced some endpoint utility classes to facilitate extraction and conversion of property values, and populating of common "builder" values. Change-Id: Ie1c91cd94cb54700dc9127f72780b4d94b82ec39 Issue-ID: POLICY-1791 Signed-off-by: Jim Hahn --- .../comm/bus/IndexedDmaapTopicSinkFactory.java | 200 +++-------------- .../comm/bus/IndexedDmaapTopicSourceFactory.java | 249 ++++----------------- .../event/comm/bus/IndexedUebTopicSinkFactory.java | 104 +++------ .../comm/bus/IndexedUebTopicSourceFactory.java | 144 ++++-------- .../event/comm/bus/internal/BusPublisher.java | 152 +++++++------ .../bus/internal/SingleThreadedBusTopicSource.java | 31 +-- 6 files changed, 244 insertions(+), 636 deletions(-) (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm') diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java index 4ccf08de..6f0753b3 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,15 +21,15 @@ package org.onap.policy.common.endpoints.event.comm.bus; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; - +import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils; +import org.onap.policy.common.endpoints.utils.PropertyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +53,7 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { @Override public DmaapTopicSink build(BusTopicParams busTopicParams) { - if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { + if (StringUtils.isBlank(busTopicParams.getTopic())) { throw new IllegalArgumentException(MISSING_TOPIC); } @@ -86,178 +86,44 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { public List build(Properties properties) { String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS); - if (writeTopics == null || writeTopics.isEmpty()) { + if (StringUtils.isBlank(writeTopics)) { logger.info("{}: no topic for DMaaP Sink", this); return new ArrayList<>(); } - List writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); List newDmaapTopicSinks = new ArrayList<>(); synchronized (this) { - for (String topic : writeTopicList) { - if (this.dmaapTopicWriters.containsKey(topic)) { - newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic)); - continue; - } - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - List serverList; - if (servers != null && !servers.isEmpty()) { - serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - } else { - serverList = new ArrayList<>(); - } - - final String effectiveTopic = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic); - - final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - - final String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); - final String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); - - final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); - - final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - - /* DME2 Properties */ - - final String dme2Environment = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - - final String dme2AftEnvironment = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - - final String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); - - final String dme2RouteOffer = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); - - final String dme2Latitude = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - - final String dme2Longitude = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - - final String dme2EpReadTimeoutMs = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); - - final String dme2EpConnTimeout = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); - - final String dme2RoundtripTimeoutMs = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); - - final String dme2Version = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); - - final String dme2SubContextPath = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); - - final String dme2SessionStickinessRequired = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); - - Map dme2AdditionalProps = new HashMap<>(); - - if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); - } - if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) { - dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); - } - if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); - } - if (dme2Version != null && !dme2Version.isEmpty()) { - dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); - } - if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) { - dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); - } - if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) { - dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); - } - - if (servers == null || servers.isEmpty()) { - logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); - continue; - } - - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - String allowSelfSignedCertsString = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - DmaapTopicSink dmaapTopicSink = this.build(BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .effectiveTopic(effectiveTopic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .userName(aafMechId) - .password(aafPassword) - .partitionId(partitionKey) - .environment(dme2Environment) - .aftEnvironment(dme2AftEnvironment) - .partner(dme2Partner) - .latitude(dme2Latitude) - .longitude(dme2Longitude) - .additionalProps(dme2AdditionalProps) - .managed(managed) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - - newDmaapTopicSinks.add(dmaapTopicSink); + for (String topic : writeTopics.split("\\s*,\\s*")) { + addTopic(newDmaapTopicSinks, properties, topic); } return newDmaapTopicSinks; } } + private void addTopic(List newDmaapTopicSinks, Properties properties, String topic) { + if (this.dmaapTopicWriters.containsKey(topic)) { + newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic)); + return; + } + + String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic; + + PropertyUtils props = new PropertyUtils(properties, topicPrefix, + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); + + String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (StringUtils.isBlank(servers)) { + logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); + return; + } + + DmaapTopicSink dmaapTopicSink = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers) + .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null)) + .build()); + + newDmaapTopicSinks.add(dmaapTopicSink); + } + /** * Makes a new sink. * diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java index ddc3321f..c895a409 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,15 +21,15 @@ package org.onap.policy.common.endpoints.event.comm.bus; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; - +import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils; +import org.onap.policy.common.endpoints.utils.PropertyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,214 +75,15 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { public List build(Properties properties) { String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS); - if (readTopics == null || readTopics.isEmpty()) { + if (StringUtils.isBlank(readTopics)) { logger.info("{}: no topic for DMaaP Source", this); return new ArrayList<>(); } - List readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); List dmaapTopicSourceLst = new ArrayList<>(); synchronized (this) { - for (String topic : readTopicList) { - if (this.dmaapTopicSources.containsKey(topic)) { - dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - List serverList; - if (servers != null && !servers.isEmpty()) { - serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - } else { - serverList = new ArrayList<>(); - } - - final String effectiveTopic = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic); - - final String apiKey = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - - final String apiSecret = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - - final String aafMechId = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); - - final String aafPassword = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); - - final String consumerGroup = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); - - final String consumerInstance = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); - - final String fetchTimeoutString = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); - - /* DME2 Properties */ - - final String dme2Environment = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - - final String dme2AftEnvironment = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - - final String dme2Partner = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); - - final String dme2RouteOffer = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); - - final String dme2Latitude = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - - final String dme2Longitude = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - - final String dme2EpReadTimeoutMs = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); - - final String dme2EpConnTimeout = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); - - final String dme2RoundtripTimeoutMs = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); - - final String dme2Version = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); - - final String dme2SubContextPath = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); - - final String dme2SessionStickinessRequired = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); - - Map dme2AdditionalProps = new HashMap<>(); - - if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); - } - if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) { - dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); - } - if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); - } - if (dme2Version != null && !dme2Version.isEmpty()) { - dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); - } - if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) { - dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); - } - if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) { - dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); - } - - if (servers == null || servers.isEmpty()) { - logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); - continue; - } - - int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH; - if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { - try { - fetchTimeout = Integer.parseInt(fetchTimeoutString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString, - topic); - } - } - - String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); - int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH; - if (fetchLimitString != null && !fetchLimitString.isEmpty()) { - try { - fetchLimit = Integer.parseInt(fetchLimitString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString, - topic); - } - } - - String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - String allowSelfSignedCertsString = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - - DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .effectiveTopic(effectiveTopic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .userName(aafMechId) - .password(aafPassword) - .consumerGroup(consumerGroup) - .consumerInstance(consumerInstance) - .fetchTimeout(fetchTimeout) - .fetchLimit(fetchLimit) - .environment(dme2Environment) - .aftEnvironment(dme2AftEnvironment) - .partner(dme2Partner) - .latitude(dme2Latitude) - .longitude(dme2Longitude) - .additionalProps(dme2AdditionalProps) - .managed(managed) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - - dmaapTopicSourceLst.add(uebTopicSource); + for (String topic : readTopics.split("\\s*,\\s*")) { + addTopic(dmaapTopicSourceLst, properties, topic); } } return dmaapTopicSourceLst; @@ -308,9 +109,41 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { return this.build(servers, topic, null, null); } + private void addTopic(List dmaapTopicSourceLst, Properties properties, String topic) { + if (this.dmaapTopicSources.containsKey(topic)) { + dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic)); + return; + } + + String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic; + + PropertyUtils props = new PropertyUtils(properties, topicPrefix, + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); + + String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (StringUtils.isBlank(servers)) { + logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); + return; + } + + DmaapTopicSource uebTopicSource = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers) + .consumerGroup(props.getString( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null)) + .consumerInstance(props.getString( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null)) + .fetchTimeout(props.getInteger( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, + DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH)) + .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, + DmaapTopicSource.DEFAULT_LIMIT_FETCH)) + .build()); + + dmaapTopicSourceLst.add(uebTopicSource); + } + /** * Makes a new source. - * + * * @param busTopicParams parameters to use to configure the source * @return a new source */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java index 62437823..150a02c0 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,14 +21,15 @@ package org.onap.policy.common.endpoints.event.comm.bus; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Properties; - +import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.endpoints.utils.PropertyUtils; +import org.onap.policy.common.endpoints.utils.UebPropertyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +56,7 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { throw new IllegalArgumentException("UEB Server(s) must be provided"); } - if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { + if (StringUtils.isBlank(busTopicParams.getTopic())) { throw new IllegalArgumentException(MISSING_TOPIC); } @@ -91,82 +92,43 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { public List build(Properties properties) { String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS); - if (writeTopics == null || writeTopics.isEmpty()) { + if (StringUtils.isBlank(writeTopics)) { logger.info("{}: no topic for UEB Sink", this); return new ArrayList<>(); } - List writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); List newUebTopicSinks = new ArrayList<>(); synchronized (this) { - for (String topic : writeTopicList) { - if (this.uebTopicSinks.containsKey(topic)) { - newUebTopicSinks.add(this.uebTopicSinks.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (servers == null || servers.isEmpty()) { - logger.error("{}: no UEB servers configured for sink {}", this, topic); - continue; - } - - final List serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - final String effectiveTopic = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic); - final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); - - String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - - String allowSelfSignedCertsString = - properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - UebTopicSink uebTopicWriter = this.build(BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .effectiveTopic(effectiveTopic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .partitionId(partitionKey) - .managed(managed) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - newUebTopicSinks.add(uebTopicWriter); + for (String topic : writeTopics.split("\\s*,\\s*")) { + addTopic(newUebTopicSinks, topic, properties); } return newUebTopicSinks; } } + private void addTopic(List newUebTopicSinks, String topic, Properties properties) { + if (this.uebTopicSinks.containsKey(topic)) { + newUebTopicSinks.add(this.uebTopicSinks.get(topic)); + return; + } + + String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic; + + PropertyUtils props = new PropertyUtils(properties, topicPrefix, + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); + + String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (StringUtils.isBlank(servers)) { + logger.error("{}: no UEB servers configured for sink {}", this, topic); + return; + } + + UebTopicSink uebTopicWriter = this.build(UebPropertyUtils.makeBuilder(props, topic, servers) + .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null)) + .build()); + newUebTopicSinks.add(uebTopicWriter); + } + @Override public void destroy(String topic) { @@ -221,7 +183,7 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { /** * Makes a new sink. - * + * * @param busTopicParams parameters to use to configure the sink * @return a new sink */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java index f3ef8fdc..6655aa12 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,14 +21,15 @@ package org.onap.policy.common.endpoints.event.comm.bus; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Properties; - +import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.endpoints.utils.PropertyUtils; +import org.onap.policy.common.endpoints.utils.UebPropertyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,110 +78,15 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { public List build(Properties properties) { String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS); - if (readTopics == null || readTopics.isEmpty()) { + if (StringUtils.isBlank(readTopics)) { logger.info("{}: no topic for UEB Source", this); return new ArrayList<>(); } - List readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); List newUebTopicSources = new ArrayList<>(); synchronized (this) { - for (String topic : readTopicList) { - if (this.uebTopicSources.containsKey(topic)) { - newUebTopicSources.add(this.uebTopicSources.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - if (servers == null || servers.isEmpty()) { - logger.error("{}: no UEB servers configured for sink {}", this, topic); - continue; - } - - final List serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - final String effectiveTopic = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic); - - final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - - final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - - final String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); - - final String consumerInstance = properties.getProperty( - PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); - - String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); - int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH; - if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { - try { - fetchTimeout = Integer.parseInt(fetchTimeoutString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString, - topic); - } - } - - String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); - int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH; - if (fetchLimitString != null && !fetchLimitString.isEmpty()) { - try { - fetchLimit = Integer.parseInt(fetchLimitString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString, - topic); - } - } - - String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - String allowSelfSignedCertsString = - properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - UebTopicSource uebTopicSource = this.build(BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .effectiveTopic(effectiveTopic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .consumerGroup(consumerGroup) - .consumerInstance(consumerInstance) - .fetchTimeout(fetchTimeout) - .fetchLimit(fetchLimit) - .managed(managed) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts).build()); - newUebTopicSources.add(uebTopicSource); + for (String topic : readTopics.split("\\s*,\\s*")) { + addTopic(newUebTopicSources, topic, properties); } } return newUebTopicSources; @@ -206,9 +112,41 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { return this.build(servers, topic, null, null); } + private void addTopic(List newUebTopicSources, String topic, Properties properties) { + if (this.uebTopicSources.containsKey(topic)) { + newUebTopicSources.add(this.uebTopicSources.get(topic)); + return; + } + + String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic; + + PropertyUtils props = new PropertyUtils(properties, topicPrefix, + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); + + String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (StringUtils.isBlank(servers)) { + logger.error("{}: no UEB servers configured for sink {}", this, topic); + return; + } + + UebTopicSource uebTopicSource = this.build(UebPropertyUtils.makeBuilder(props, topic, servers) + .consumerGroup(props.getString( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null)) + .consumerInstance(props.getString( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null)) + .fetchTimeout(props.getInteger( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, + UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH)) + .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, + UebTopicSource.DEFAULT_LIMIT_FETCH)) + .build()); + + newUebTopicSources.add(uebTopicSource); + } + /** * Makes a new source. - * + * * @param busTopicParams parameters to use to configure the source * @return a new source */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 1c85fa97..67adf3b4 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -178,27 +178,50 @@ public interface BusPublisher { } + configureProtocol(topic, protocol, servers, useHttps); + + this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); + + this.publisher.setUsername(username); + this.publisher.setPassword(password); + + props = new Properties(); + + props.setProperty("Protocol", (useHttps ? "https" : "http")); + props.setProperty("contenttype", "application/json"); + props.setProperty("username", username); + props.setProperty("password", password); + + props.setProperty("topic", topic); + + this.publisher.setProps(props); + + if (protocol == ProtocolTypeConstants.AAF_AUTH) { + this.publisher.setHost(servers.get(0)); + } + + logger.info("{}: CREATION: using protocol {}", this, protocol.getValue()); + } + + private void configureProtocol(String topic, ProtocolTypeConstants protocol, List servers, + boolean useHttps) { + if (protocol == ProtocolTypeConstants.AAF_AUTH) { if (servers == null || servers.isEmpty()) { throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided"); } ArrayList dmaapServers = new ArrayList<>(); - if (useHttps) { - for (String server : servers) { - dmaapServers.add(server + ":3905"); - } - - } else { - for (String server : servers) { - dmaapServers.add(server + ":3904"); - } + String port = useHttps ? ":3905" : ":3904"; + for (String server : servers) { + dmaapServers.add(server + port); } this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + } else if (protocol == ProtocolTypeConstants.DME2) { ArrayList dmaapServers = new ArrayList<>(); dmaapServers.add("0.0.0.0:3904"); @@ -206,36 +229,10 @@ public interface BusPublisher { this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - } else { - throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol); - } - - this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); - - this.publisher.setUsername(username); - this.publisher.setPassword(password); - - props = new Properties(); - if (useHttps) { - props.setProperty("Protocol", "https"); } else { - props.setProperty("Protocol", "http"); - } - - props.setProperty("contenttype", "application/json"); - props.setProperty("username", username); - props.setProperty("password", password); - - props.setProperty("topic", topic); - - this.publisher.setProps(props); - - if (protocol == ProtocolTypeConstants.AAF_AUTH) { - this.publisher.setHost(servers.get(0)); + throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol); } - - logger.info("{}: CREATION: using protocol {}", this, protocol.getValue()); } @Override @@ -300,38 +297,12 @@ public interface BusPublisher { super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(), busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps()); - String dme2RouteOffer = null; - if (busTopicParams.isAdditionalPropsValid()) { - dme2RouteOffer = busTopicParams.getAdditionalProps().get( - DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); - } - if (busTopicParams.isEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isAftEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isLatitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (busTopicParams.isLongitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - } + String dme2RouteOffer = busTopicParams.isAdditionalPropsValid() + ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY) + : null; - if ((busTopicParams.isPartnerInvalid()) - && StringUtils.isBlank(dme2RouteOffer)) { - throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); - } + validateParams(busTopicParams, dme2RouteOffer); String serviceName = busTopicParams.getServers().get(0); @@ -366,19 +337,52 @@ public interface BusPublisher { props.setProperty("MethodType", "POST"); if (busTopicParams.isAdditionalPropsValid()) { - for (Map.Entry entry : busTopicParams.getAdditionalProps().entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - - if (value != null) { - props.setProperty(key, value); - } - } + addAdditionalProps(busTopicParams); } this.publisher.setProps(props); } + private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) { + if (busTopicParams.isEnvironmentInvalid()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + } + if (busTopicParams.isAftEnvironmentInvalid()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + } + if (busTopicParams.isLatitudeInvalid()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + } + if (busTopicParams.isLongitudeInvalid()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + } + + if ((busTopicParams.isPartnerInvalid()) + && StringUtils.isBlank(dme2RouteOffer)) { + throw new IllegalArgumentException( + "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + busTopicParams.getTopic() + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic() + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + } + } + + private void addAdditionalProps(BusTopicParams busTopicParams) { + for (Map.Entry entry : busTopicParams.getAdditionalProps().entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + + if (value != null) { + props.setProperty(key, value); + } + } + } + private IllegalArgumentException parmException(String topic, String propnm) { return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + propnm + " property for DME2 in DMaaP"); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java index 98e30e27..0953465b 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -21,6 +21,7 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import java.io.IOException; import java.net.MalformedURLException; import java.util.UUID; @@ -223,19 +224,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase public void run() { while (this.alive) { try { - for (String event : this.consumer.fetch()) { - synchronized (this) { - this.recentEvents.add(event); - } - - NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event); - - broadcast(event); - - if (!this.alive) { - break; - } - } + fetchAllMessages(); } catch (Exception e) { logger.error("{}: cannot fetch because of ", this, e.getMessage(), e); } @@ -244,6 +233,22 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase logger.info("{}: exiting thread", this); } + private void fetchAllMessages() throws InterruptedException, IOException { + for (String event : this.consumer.fetch()) { + synchronized (this) { + this.recentEvents.add(event); + } + + NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event); + + broadcast(event); + + if (!this.alive) { + return; + } + } + } + @Override public boolean offer(String event) { if (!this.alive) { -- cgit 1.2.3-korg