diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org')
12 files changed, 712 insertions, 826 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java index 4ccf08de..6f0753b3 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,15 +21,15 @@ package org.onap.policy.common.endpoints.event.comm.bus; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; - +import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils; +import org.onap.policy.common.endpoints.utils.PropertyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +53,7 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { @Override public DmaapTopicSink build(BusTopicParams busTopicParams) { - if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { + if (StringUtils.isBlank(busTopicParams.getTopic())) { throw new IllegalArgumentException(MISSING_TOPIC); } @@ -86,178 +86,44 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { public List<DmaapTopicSink> build(Properties properties) { String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS); - if (writeTopics == null || writeTopics.isEmpty()) { + if (StringUtils.isBlank(writeTopics)) { logger.info("{}: no topic for DMaaP Sink", this); return new ArrayList<>(); } - List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>(); synchronized (this) { - for (String topic : writeTopicList) { - if (this.dmaapTopicWriters.containsKey(topic)) { - newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic)); - continue; - } - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - List<String> serverList; - if (servers != null && !servers.isEmpty()) { - serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - } else { - serverList = new ArrayList<>(); - } - - final String effectiveTopic = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic); - - final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - - final String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); - final String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); - - final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); - - final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - - /* DME2 Properties */ - - final String dme2Environment = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - - final String dme2AftEnvironment = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - - final String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); - - final String dme2RouteOffer = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); - - final String dme2Latitude = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - - final String dme2Longitude = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - - final String dme2EpReadTimeoutMs = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); - - final String dme2EpConnTimeout = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); - - final String dme2RoundtripTimeoutMs = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); - - final String dme2Version = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); - - final String dme2SubContextPath = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); - - final String dme2SessionStickinessRequired = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); - - Map<String, String> dme2AdditionalProps = new HashMap<>(); - - if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); - } - if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) { - dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); - } - if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); - } - if (dme2Version != null && !dme2Version.isEmpty()) { - dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); - } - if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) { - dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); - } - if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) { - dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); - } - - if (servers == null || servers.isEmpty()) { - logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); - continue; - } - - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - String allowSelfSignedCertsString = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - DmaapTopicSink dmaapTopicSink = this.build(BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .effectiveTopic(effectiveTopic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .userName(aafMechId) - .password(aafPassword) - .partitionId(partitionKey) - .environment(dme2Environment) - .aftEnvironment(dme2AftEnvironment) - .partner(dme2Partner) - .latitude(dme2Latitude) - .longitude(dme2Longitude) - .additionalProps(dme2AdditionalProps) - .managed(managed) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - - newDmaapTopicSinks.add(dmaapTopicSink); + for (String topic : writeTopics.split("\\s*,\\s*")) { + addTopic(newDmaapTopicSinks, properties, topic); } return newDmaapTopicSinks; } } + private void addTopic(List<DmaapTopicSink> newDmaapTopicSinks, Properties properties, String topic) { + if (this.dmaapTopicWriters.containsKey(topic)) { + newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic)); + return; + } + + String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic; + + PropertyUtils props = new PropertyUtils(properties, topicPrefix, + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); + + String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (StringUtils.isBlank(servers)) { + logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); + return; + } + + DmaapTopicSink dmaapTopicSink = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers) + .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null)) + .build()); + + newDmaapTopicSinks.add(dmaapTopicSink); + } + /** * Makes a new sink. * diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java index ddc3321f..c895a409 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,15 +21,15 @@ package org.onap.policy.common.endpoints.event.comm.bus; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; - +import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.endpoints.utils.DmaapPropertyUtils; +import org.onap.policy.common.endpoints.utils.PropertyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,214 +75,15 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { public List<DmaapTopicSource> build(Properties properties) { String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS); - if (readTopics == null || readTopics.isEmpty()) { + if (StringUtils.isBlank(readTopics)) { logger.info("{}: no topic for DMaaP Source", this); return new ArrayList<>(); } - List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>(); synchronized (this) { - for (String topic : readTopicList) { - if (this.dmaapTopicSources.containsKey(topic)) { - dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - List<String> serverList; - if (servers != null && !servers.isEmpty()) { - serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - } else { - serverList = new ArrayList<>(); - } - - final String effectiveTopic = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic); - - final String apiKey = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - - final String apiSecret = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - - final String aafMechId = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); - - final String aafPassword = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); - - final String consumerGroup = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); - - final String consumerInstance = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); - - final String fetchTimeoutString = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); - - /* DME2 Properties */ - - final String dme2Environment = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - - final String dme2AftEnvironment = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - - final String dme2Partner = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); - - final String dme2RouteOffer = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); - - final String dme2Latitude = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - - final String dme2Longitude = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - - final String dme2EpReadTimeoutMs = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); - - final String dme2EpConnTimeout = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); - - final String dme2RoundtripTimeoutMs = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); - - final String dme2Version = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); - - final String dme2SubContextPath = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); - - final String dme2SessionStickinessRequired = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); - - Map<String, String> dme2AdditionalProps = new HashMap<>(); - - if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); - } - if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) { - dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); - } - if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); - } - if (dme2Version != null && !dme2Version.isEmpty()) { - dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); - } - if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) { - dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); - } - if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) { - dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); - } - - if (servers == null || servers.isEmpty()) { - logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); - continue; - } - - int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH; - if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { - try { - fetchTimeout = Integer.parseInt(fetchTimeoutString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString, - topic); - } - } - - String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); - int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH; - if (fetchLimitString != null && !fetchLimitString.isEmpty()) { - try { - fetchLimit = Integer.parseInt(fetchLimitString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString, - topic); - } - } - - String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - String allowSelfSignedCertsString = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - - DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .effectiveTopic(effectiveTopic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .userName(aafMechId) - .password(aafPassword) - .consumerGroup(consumerGroup) - .consumerInstance(consumerInstance) - .fetchTimeout(fetchTimeout) - .fetchLimit(fetchLimit) - .environment(dme2Environment) - .aftEnvironment(dme2AftEnvironment) - .partner(dme2Partner) - .latitude(dme2Latitude) - .longitude(dme2Longitude) - .additionalProps(dme2AdditionalProps) - .managed(managed) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - - dmaapTopicSourceLst.add(uebTopicSource); + for (String topic : readTopics.split("\\s*,\\s*")) { + addTopic(dmaapTopicSourceLst, properties, topic); } } return dmaapTopicSourceLst; @@ -308,9 +109,41 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { return this.build(servers, topic, null, null); } + private void addTopic(List<DmaapTopicSource> dmaapTopicSourceLst, Properties properties, String topic) { + if (this.dmaapTopicSources.containsKey(topic)) { + dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic)); + return; + } + + String topicPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic; + + PropertyUtils props = new PropertyUtils(properties, topicPrefix, + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); + + String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (StringUtils.isBlank(servers)) { + logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); + return; + } + + DmaapTopicSource uebTopicSource = this.build(DmaapPropertyUtils.makeBuilder(props, topic, servers) + .consumerGroup(props.getString( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null)) + .consumerInstance(props.getString( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null)) + .fetchTimeout(props.getInteger( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, + DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH)) + .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, + DmaapTopicSource.DEFAULT_LIMIT_FETCH)) + .build()); + + dmaapTopicSourceLst.add(uebTopicSource); + } + /** * Makes a new source. - * + * * @param busTopicParams parameters to use to configure the source * @return a new source */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java index 62437823..150a02c0 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,14 +21,15 @@ package org.onap.policy.common.endpoints.event.comm.bus; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Properties; - +import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.endpoints.utils.PropertyUtils; +import org.onap.policy.common.endpoints.utils.UebPropertyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +56,7 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { throw new IllegalArgumentException("UEB Server(s) must be provided"); } - if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { + if (StringUtils.isBlank(busTopicParams.getTopic())) { throw new IllegalArgumentException(MISSING_TOPIC); } @@ -91,82 +92,43 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { public List<UebTopicSink> build(Properties properties) { String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS); - if (writeTopics == null || writeTopics.isEmpty()) { + if (StringUtils.isBlank(writeTopics)) { logger.info("{}: no topic for UEB Sink", this); return new ArrayList<>(); } - List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); List<UebTopicSink> newUebTopicSinks = new ArrayList<>(); synchronized (this) { - for (String topic : writeTopicList) { - if (this.uebTopicSinks.containsKey(topic)) { - newUebTopicSinks.add(this.uebTopicSinks.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (servers == null || servers.isEmpty()) { - logger.error("{}: no UEB servers configured for sink {}", this, topic); - continue; - } - - final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - final String effectiveTopic = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic); - final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); - - String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - - String allowSelfSignedCertsString = - properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - UebTopicSink uebTopicWriter = this.build(BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .effectiveTopic(effectiveTopic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .partitionId(partitionKey) - .managed(managed) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - newUebTopicSinks.add(uebTopicWriter); + for (String topic : writeTopics.split("\\s*,\\s*")) { + addTopic(newUebTopicSinks, topic, properties); } return newUebTopicSinks; } } + private void addTopic(List<UebTopicSink> newUebTopicSinks, String topic, Properties properties) { + if (this.uebTopicSinks.containsKey(topic)) { + newUebTopicSinks.add(this.uebTopicSinks.get(topic)); + return; + } + + String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic; + + PropertyUtils props = new PropertyUtils(properties, topicPrefix, + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); + + String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (StringUtils.isBlank(servers)) { + logger.error("{}: no UEB servers configured for sink {}", this, topic); + return; + } + + UebTopicSink uebTopicWriter = this.build(UebPropertyUtils.makeBuilder(props, topic, servers) + .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null)) + .build()); + newUebTopicSinks.add(uebTopicWriter); + } + @Override public void destroy(String topic) { @@ -221,7 +183,7 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { /** * Makes a new sink. - * + * * @param busTopicParams parameters to use to configure the sink * @return a new sink */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java index f3ef8fdc..6655aa12 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,14 +21,15 @@ package org.onap.policy.common.endpoints.event.comm.bus; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Properties; - +import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.endpoints.utils.PropertyUtils; +import org.onap.policy.common.endpoints.utils.UebPropertyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,110 +78,15 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { public List<UebTopicSource> build(Properties properties) { String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS); - if (readTopics == null || readTopics.isEmpty()) { + if (StringUtils.isBlank(readTopics)) { logger.info("{}: no topic for UEB Source", this); return new ArrayList<>(); } - List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); List<UebTopicSource> newUebTopicSources = new ArrayList<>(); synchronized (this) { - for (String topic : readTopicList) { - if (this.uebTopicSources.containsKey(topic)) { - newUebTopicSources.add(this.uebTopicSources.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - if (servers == null || servers.isEmpty()) { - logger.error("{}: no UEB servers configured for sink {}", this, topic); - continue; - } - - final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - final String effectiveTopic = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic); - - final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - - final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - - final String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); - - final String consumerInstance = properties.getProperty( - PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); - - String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); - int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH; - if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { - try { - fetchTimeout = Integer.parseInt(fetchTimeoutString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString, - topic); - } - } - - String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); - int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH; - if (fetchLimitString != null && !fetchLimitString.isEmpty()) { - try { - fetchLimit = Integer.parseInt(fetchLimitString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString, - topic); - } - } - - String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - String allowSelfSignedCertsString = - properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - UebTopicSource uebTopicSource = this.build(BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .effectiveTopic(effectiveTopic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .consumerGroup(consumerGroup) - .consumerInstance(consumerInstance) - .fetchTimeout(fetchTimeout) - .fetchLimit(fetchLimit) - .managed(managed) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts).build()); - newUebTopicSources.add(uebTopicSource); + for (String topic : readTopics.split("\\s*,\\s*")) { + addTopic(newUebTopicSources, topic, properties); } } return newUebTopicSources; @@ -206,9 +112,41 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { return this.build(servers, topic, null, null); } + private void addTopic(List<UebTopicSource> newUebTopicSources, String topic, Properties properties) { + if (this.uebTopicSources.containsKey(topic)) { + newUebTopicSources.add(this.uebTopicSources.get(topic)); + return; + } + + String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic; + + PropertyUtils props = new PropertyUtils(properties, topicPrefix, + (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic)); + + String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (StringUtils.isBlank(servers)) { + logger.error("{}: no UEB servers configured for sink {}", this, topic); + return; + } + + UebTopicSource uebTopicSource = this.build(UebPropertyUtils.makeBuilder(props, topic, servers) + .consumerGroup(props.getString( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null)) + .consumerInstance(props.getString( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null)) + .fetchTimeout(props.getInteger( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, + UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH)) + .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, + UebTopicSource.DEFAULT_LIMIT_FETCH)) + .build()); + + newUebTopicSources.add(uebTopicSource); + } + /** * Makes a new source. - * + * * @param busTopicParams parameters to use to configure the source * @return a new source */ diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 1c85fa97..67adf3b4 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -178,27 +178,50 @@ public interface BusPublisher { } + configureProtocol(topic, protocol, servers, useHttps); + + this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); + + this.publisher.setUsername(username); + this.publisher.setPassword(password); + + props = new Properties(); + + props.setProperty("Protocol", (useHttps ? "https" : "http")); + props.setProperty("contenttype", "application/json"); + props.setProperty("username", username); + props.setProperty("password", password); + + props.setProperty("topic", topic); + + this.publisher.setProps(props); + + if (protocol == ProtocolTypeConstants.AAF_AUTH) { + this.publisher.setHost(servers.get(0)); + } + + logger.info("{}: CREATION: using protocol {}", this, protocol.getValue()); + } + + private void configureProtocol(String topic, ProtocolTypeConstants protocol, List<String> servers, + boolean useHttps) { + if (protocol == ProtocolTypeConstants.AAF_AUTH) { if (servers == null || servers.isEmpty()) { throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided"); } ArrayList<String> dmaapServers = new ArrayList<>(); - if (useHttps) { - for (String server : servers) { - dmaapServers.add(server + ":3905"); - } - - } else { - for (String server : servers) { - dmaapServers.add(server + ":3904"); - } + String port = useHttps ? ":3905" : ":3904"; + for (String server : servers) { + dmaapServers.add(server + port); } this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + } else if (protocol == ProtocolTypeConstants.DME2) { ArrayList<String> dmaapServers = new ArrayList<>(); dmaapServers.add("0.0.0.0:3904"); @@ -206,36 +229,10 @@ public interface BusPublisher { this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build(); this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - } else { - throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol); - } - - this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); - - this.publisher.setUsername(username); - this.publisher.setPassword(password); - - props = new Properties(); - if (useHttps) { - props.setProperty("Protocol", "https"); } else { - props.setProperty("Protocol", "http"); - } - - props.setProperty("contenttype", "application/json"); - props.setProperty("username", username); - props.setProperty("password", password); - - props.setProperty("topic", topic); - - this.publisher.setProps(props); - - if (protocol == ProtocolTypeConstants.AAF_AUTH) { - this.publisher.setHost(servers.get(0)); + throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol); } - - logger.info("{}: CREATION: using protocol {}", this, protocol.getValue()); } @Override @@ -300,38 +297,12 @@ public interface BusPublisher { super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(), busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps()); - String dme2RouteOffer = null; - if (busTopicParams.isAdditionalPropsValid()) { - dme2RouteOffer = busTopicParams.getAdditionalProps().get( - DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); - } - if (busTopicParams.isEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isAftEnvironmentInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - } - if (busTopicParams.isLatitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - } - if (busTopicParams.isLongitudeInvalid()) { - throw parmException(busTopicParams.getTopic(), - PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - } + String dme2RouteOffer = busTopicParams.isAdditionalPropsValid() + ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY) + : null; - if ((busTopicParams.isPartnerInvalid()) - && StringUtils.isBlank(dme2RouteOffer)) { - throw new IllegalArgumentException( - "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " - + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic() - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); - } + validateParams(busTopicParams, dme2RouteOffer); String serviceName = busTopicParams.getServers().get(0); @@ -366,19 +337,52 @@ public interface BusPublisher { props.setProperty("MethodType", "POST"); if (busTopicParams.isAdditionalPropsValid()) { - for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - - if (value != null) { - props.setProperty(key, value); - } - } + addAdditionalProps(busTopicParams); } this.publisher.setProps(props); } + private void validateParams(BusTopicParams busTopicParams, String dme2RouteOffer) { + if (busTopicParams.isEnvironmentInvalid()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + } + if (busTopicParams.isAftEnvironmentInvalid()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + } + if (busTopicParams.isLatitudeInvalid()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + } + if (busTopicParams.isLongitudeInvalid()) { + throw parmException(busTopicParams.getTopic(), + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + } + + if ((busTopicParams.isPartnerInvalid()) + && StringUtils.isBlank(dme2RouteOffer)) { + throw new IllegalArgumentException( + "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + busTopicParams.getTopic() + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic() + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + } + } + + private void addAdditionalProps(BusTopicParams busTopicParams) { + for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + + if (value != null) { + props.setProperty(key, value); + } + } + } + private IllegalArgumentException parmException(String topic, String propnm) { return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + propnm + " property for DME2 in DMaaP"); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java index 98e30e27..0953465b 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -21,6 +21,7 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import java.io.IOException; import java.net.MalformedURLException; import java.util.UUID; @@ -223,19 +224,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase public void run() { while (this.alive) { try { - for (String event : this.consumer.fetch()) { - synchronized (this) { - this.recentEvents.add(event); - } - - NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event); - - broadcast(event); - - if (!this.alive) { - break; - } - } + fetchAllMessages(); } catch (Exception e) { logger.error("{}: cannot fetch because of ", this, e.getMessage(), e); } @@ -244,6 +233,22 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase logger.info("{}: exiting thread", this); } + private void fetchAllMessages() throws InterruptedException, IOException { + for (String event : this.consumer.fetch()) { + synchronized (this) { + this.recentEvents.add(event); + } + + NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event); + + broadcast(event); + + if (!this.alive) { + return; + } + } + } + @Override public boolean offer(String event) { if (!this.alive) { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java index 5cc0071e..c2d0e400 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,15 +23,14 @@ package org.onap.policy.common.endpoints.http.client; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Properties; - import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.http.client.internal.JerseyClient; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.endpoints.utils.PropertyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,78 +69,54 @@ class IndexedHttpClientFactory implements HttpClientFactory { ArrayList<HttpClient> clientList = new ArrayList<>(); String clientNames = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES); - if (clientNames == null || clientNames.isEmpty()) { + if (StringUtils.isBlank(clientNames)) { return clientList; } - List<String> clientNameList = new ArrayList<>(Arrays.asList(clientNames.split("\\s*,\\s*"))); - - for (String clientName : clientNameList) { - String httpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." - + clientName + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - boolean https = false; - if (StringUtils.isNotBlank(httpsString)) { - https = Boolean.parseBoolean(httpsString); - } - - String hostName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." - + clientName + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX); - - String servicePortString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES - + "." + clientName + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX); - int port; - try { - if (servicePortString == null || servicePortString.isEmpty()) { - continue; - } - port = Integer.parseInt(servicePortString); - } catch (NumberFormatException nfe) { - logger.error("http-client-factory: cannot parse port {}", servicePortString, nfe); - continue; - } - - String baseUrl = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." - + clientName + PolicyEndPointProperties.PROPERTY_HTTP_URL_SUFFIX); - - String userName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." - + clientName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX); - - String password = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." - + clientName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX); - - final String classProv = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES - + "." + clientName + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER); - - String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." - + clientName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - try { - HttpClient client = - this.build(BusTopicParams.builder() - .clientName(clientName) - .useHttps(https) - .allowSelfSignedCerts(https) - .hostname(hostName) - .port(port) - .basePath(baseUrl) - .userName(userName) - .password(password) - .managed(managed) - .serializationProvider(classProv) - .build()); - clientList.add(client); - } catch (Exception e) { - logger.error("http-client-factory: cannot build client {}", clientName, e); - } + for (String clientName : clientNames.split("\\s*,\\s*")) { + addClient(clientList, clientName, properties); } return clientList; } + private void addClient(ArrayList<HttpClient> clientList, String clientName, Properties properties) { + String clientPrefix = PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + clientName; + + PropertyUtils props = new PropertyUtils(properties, clientPrefix, + (name, value, ex) -> + logger.warn("{}: {} {} is in invalid format for http client {} ", this, name, value, clientName)); + + int port = props.getInteger(PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX, -1); + if (port < 0) { + logger.warn("No HTTP port for client in {}", clientName); + return; + } + + boolean https = props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX, false); + + try { + HttpClient client = this.build(BusTopicParams.builder() + .clientName(clientName) + .useHttps(https) + .allowSelfSignedCerts(https) + .hostname(props.getString(PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, null)) + .port(port) + .basePath(props.getString(PolicyEndPointProperties.PROPERTY_HTTP_URL_SUFFIX, null)) + .userName(props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, + null)) + .password(props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, + null)) + .managed(props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true)) + .serializationProvider(props.getString( + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER, null)) + .build()); + clientList.add(client); + } catch (Exception e) { + logger.error("http-client-factory: cannot build client {}", clientName, e); + } + } + @Override public synchronized HttpClient get(String name) { if (clients.containsKey(name)) { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java index d4ccc494..4b73c5c4 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/internal/JerseyClient.java @@ -33,6 +33,7 @@ import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Entity; import javax.ws.rs.client.Invocation.Builder; import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; import org.glassfish.jersey.client.ClientProperties; import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; @@ -84,8 +85,6 @@ public class JerseyClient implements HttpClient { public JerseyClient(BusTopicParams busTopicParams) throws KeyManagementException, NoSuchAlgorithmException, ClassNotFoundException { - super(); - if (busTopicParams.isClientNameInvalid()) { throw new IllegalArgumentException("Name must be provided"); } @@ -106,10 +105,23 @@ public class JerseyClient implements HttpClient { this.userName = busTopicParams.getUserName(); this.password = busTopicParams.getPassword(); this.selfSignedCerts = busTopicParams.isAllowSelfSignedCerts(); + this.client = detmClient(); + + if (!StringUtils.isBlank(this.userName) && !StringUtils.isBlank(this.password)) { + HttpAuthenticationFeature authFeature = HttpAuthenticationFeature.basic(userName, password); + this.client.register(authFeature); + } + + this.client.property(ClientProperties.METAINF_SERVICES_LOOKUP_DISABLE, "true"); + + registerSerProviders(busTopicParams.getSerializationProvider()); + + this.baseUrl = (this.https ? "https://" : "http://") + this.hostname + ":" + this.port + "/" + + (this.basePath == null ? "" : this.basePath); + } - StringBuilder tmpBaseUrl = new StringBuilder(); + private Client detmClient() throws NoSuchAlgorithmException, KeyManagementException { if (this.https) { - tmpBaseUrl.append("https://"); ClientBuilder clientBuilder; SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); if (this.selfSignedCerts) { @@ -120,23 +132,11 @@ public class JerseyClient implements HttpClient { sslContext.init(null, null, null); clientBuilder = ClientBuilder.newBuilder().sslContext(sslContext); } - this.client = clientBuilder.build(); - } else { - tmpBaseUrl.append("http://"); - this.client = ClientBuilder.newClient(); - } + return clientBuilder.build(); - if (this.userName != null && !this.userName.isEmpty() && this.password != null && !this.password.isEmpty()) { - HttpAuthenticationFeature authFeature = HttpAuthenticationFeature.basic(userName, password); - this.client.register(authFeature); + } else { + return ClientBuilder.newClient(); } - - registerSerProviders(busTopicParams.getSerializationProvider()); - - this.client.property(ClientProperties.METAINF_SERVICES_LOOKUP_DISABLE, "true"); - - this.baseUrl = tmpBaseUrl.append(this.hostname).append(":").append(this.port).append("/") - .append((this.basePath == null) ? "" : this.basePath).toString(); } /** @@ -146,7 +146,7 @@ public class JerseyClient implements HttpClient { * @throws ClassNotFoundException if the serialization provider cannot be found */ private void registerSerProviders(String serializationProvider) throws ClassNotFoundException { - String providers = (serializationProvider == null || serializationProvider.isEmpty() + String providers = (StringUtils.isBlank(serializationProvider) ? JERSEY_DEFAULT_SERIALIZATION_PROVIDER : serializationProvider); for (String prov : providers.split(",")) { this.client.register(Class.forName(prov)); @@ -155,7 +155,7 @@ public class JerseyClient implements HttpClient { @Override public Response get(String path) { - if (path != null && !path.isEmpty()) { + if (!StringUtils.isBlank(path)) { return this.client.target(this.baseUrl).path(path).request().get(); } else { return this.client.target(this.baseUrl).request().get(); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java index b2c49eae..517ad208 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,12 +21,13 @@ package org.onap.policy.common.endpoints.http.server; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Properties; +import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.http.server.internal.JettyJerseyServer; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.endpoints.utils.PropertyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,132 +76,111 @@ class IndexedHttpServletServerFactory implements HttpServletServerFactory { ArrayList<HttpServletServer> serviceList = new ArrayList<>(); String serviceNames = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES); - if (serviceNames == null || serviceNames.isEmpty()) { + if (StringUtils.isBlank(serviceNames)) { logger.warn("No topic for HTTP Service: {}", properties); return serviceList; } - List<String> serviceNameList = Arrays.asList(serviceNames.split(SPACES_COMMA_SPACES)); - - for (String serviceName : serviceNameList) { - String servicePortString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX); - - int servicePort; - try { - if (servicePortString == null || servicePortString.isEmpty()) { - if (logger.isWarnEnabled()) { - logger.warn("No HTTP port for service in {}", serviceName); - } - continue; - } - servicePort = Integer.parseInt(servicePortString); - } catch (NumberFormatException nfe) { - if (logger.isWarnEnabled()) { - logger.warn("No HTTP port for service in {}", serviceName); - } - continue; - } + for (String serviceName : serviceNames.split(SPACES_COMMA_SPACES)) { + addService(serviceList, serviceName, properties); + } - final String hostName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." - + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX); + return serviceList; + } - final String contextUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX); + private void addService(ArrayList<HttpServletServer> serviceList, String serviceName, Properties properties) { - final String userName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." - + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX); + String servicePrefix = PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + serviceName; - final String password = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." - + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX); + PropertyUtils props = new PropertyUtils(properties, servicePrefix, + (name, value, ex) -> logger + .warn("{}: {} {} is in invalid format for http service {} ", this, name, value, serviceName)); - final String authUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_URIPATH_SUFFIX); + int servicePort = props.getInteger(PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX, -1); + if (servicePort < 0) { + logger.warn("No HTTP port for service in {}", serviceName); + return; + } - final String restClasses = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX); + final String hostName = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, null); + final String contextUriPath = + props.getString(PolicyEndPointProperties.PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX, null); + boolean managed = props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true); + boolean swagger = props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, false); + boolean https = props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX, false); - final String filterClasses = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_FILTER_CLASSES_SUFFIX); + // create the service + HttpServletServer service = build(serviceName, https, hostName, servicePort, contextUriPath, swagger, managed); - final String restPackages = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX); + // configure the service + setSerializationProvider(props, service); + setAuthentication(props, service, contextUriPath); - final String restUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_URIPATH_SUFFIX); + final String restUriPath = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_REST_URIPATH_SUFFIX, null); - final String classProv = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER); - - final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } + addFilterClasses(props, service, restUriPath); + addServletClasses(props, service, restUriPath); + addServletPackages(props, service, restUriPath); - String swaggerString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." - + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX); - boolean swagger = false; - if (swaggerString != null && !swaggerString.isEmpty()) { - swagger = Boolean.parseBoolean(swaggerString); - } + serviceList.add(service); + } - String httpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." - + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - boolean https = false; - if (httpsString != null && !httpsString.isEmpty()) { - https = Boolean.parseBoolean(httpsString); - } + private void setSerializationProvider(PropertyUtils props, HttpServletServer service) { - String aafString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." - + serviceName + PolicyEndPointProperties.PROPERTY_AAF_SUFFIX); - boolean aaf = false; - if (aafString != null && !aafString.isEmpty()) { - aaf = Boolean.parseBoolean(aafString); - } + final String classProv = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_SERIALIZATION_PROVIDER, null); - - HttpServletServer service = build(serviceName, https, hostName, servicePort, contextUriPath, swagger, - managed); + if (!StringUtils.isBlank(classProv)) { + service.setSerializationProvider(classProv); + } + } - if (classProv != null && !classProv.isEmpty()) { - service.setSerializationProvider(classProv); - } + private void setAuthentication(PropertyUtils props, HttpServletServer service, final String contextUriPath) { + /* authentication method either AAF or HTTP Basic Auth */ - /* authentication method either AAF or HTTP Basic Auth */ + boolean aaf = props.getBoolean(PolicyEndPointProperties.PROPERTY_AAF_SUFFIX, false); + final String userName = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, null); + final String password = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, null); + final String authUriPath = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_AUTH_URIPATH_SUFFIX, null); - if (aaf) { - service.setAafAuthentication(contextUriPath); - } else if (userName != null && !userName.isEmpty() && password != null && !password.isEmpty()) { - service.setBasicAuthentication(userName, password, authUriPath); - } + if (aaf) { + service.setAafAuthentication(contextUriPath); + } else if (!StringUtils.isBlank(userName) && !StringUtils.isBlank(password)) { + service.setBasicAuthentication(userName, password, authUriPath); + } + } - if (filterClasses != null && !filterClasses.isEmpty()) { - List<String> filterClassesList = Arrays.asList(filterClasses.split(SPACES_COMMA_SPACES)); - for (String filterClass : filterClassesList) { - service.addFilterClass(restUriPath, filterClass); - } - } + private void addFilterClasses(PropertyUtils props, HttpServletServer service, final String restUriPath) { - if (restClasses != null && !restClasses.isEmpty()) { - List<String> restClassesList = Arrays.asList(restClasses.split(SPACES_COMMA_SPACES)); - for (String restClass : restClassesList) { - service.addServletClass(restUriPath, restClass); - } - } + final String filterClasses = + props.getString(PolicyEndPointProperties.PROPERTY_HTTP_FILTER_CLASSES_SUFFIX, null); - if (restPackages != null && !restPackages.isEmpty()) { - List<String> restPackageList = Arrays.asList(restPackages.split(SPACES_COMMA_SPACES)); - for (String restPackage : restPackageList) { - service.addServletPackage(restUriPath, restPackage); - } + if (!StringUtils.isBlank(filterClasses)) { + for (String filterClass : filterClasses.split(SPACES_COMMA_SPACES)) { + service.addFilterClass(restUriPath, filterClass); } + } + } + + private void addServletClasses(PropertyUtils props, HttpServletServer service, final String restUriPath) { + + final String restClasses = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, null); - serviceList.add(service); + if (!StringUtils.isBlank(restClasses)) { + for (String restClass : restClasses.split(SPACES_COMMA_SPACES)) { + service.addServletClass(restUriPath, restClass); + } } + } - return serviceList; + private void addServletPackages(PropertyUtils props, HttpServletServer service, final String restUriPath) { + + final String restPackages = props.getString(PolicyEndPointProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX, null); + + if (!StringUtils.isBlank(restPackages)) { + for (String restPackage : restPackages.split(SPACES_COMMA_SPACES)) { + service.addServletPackage(restUriPath, restPackage); + } + } } @Override diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/DmaapPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/DmaapPropertyUtils.java new file mode 100644 index 00000000..6b44e5c8 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/DmaapPropertyUtils.java @@ -0,0 +1,122 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.utils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSourceFactory; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; + +public class DmaapPropertyUtils { + + /** + * Maps a topic property to a DME property. + */ + private static final Map<String,String> PROP_TO_DME; + + static { + Map<String,String> map = new HashMap<>(); + + map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX, + DmaapTopicSourceFactory.DME2_ROUTE_OFFER_PROPERTY); + + map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX, + DmaapTopicSourceFactory.DME2_READ_TIMEOUT_PROPERTY); + + map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX, + DmaapTopicSourceFactory.DME2_EP_CONN_TIMEOUT_PROPERTY); + + map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX, + DmaapTopicSourceFactory.DME2_ROUNDTRIP_TIMEOUT_PROPERTY); + + map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX, + DmaapTopicSourceFactory.DME2_VERSION_PROPERTY); + + map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX, + DmaapTopicSourceFactory.DME2_SUBCONTEXT_PATH_PROPERTY); + + map.put(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX, + DmaapTopicSourceFactory.DME2_SESSION_STICKINESS_REQUIRED_PROPERTY); + + PROP_TO_DME = Collections.unmodifiableMap(map); + } + + private DmaapPropertyUtils() { + // do nothing + } + + /** + * Makes a topic builder, configuring it with properties that are common to both + * sources and sinks. + * + * @param props properties to be used to configure the builder + * @param topic topic being configured + * @param servers target servers + * @return a topic builder + */ + public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) { + + /* Additional DME2 Properties */ + + Map<String, String> dme2AdditionalProps = new HashMap<>(); + + for (Map.Entry<String, String> ent : PROP_TO_DME.entrySet()) { + String propName = ent.getKey(); + String value = props.getString(propName, null); + + if (!StringUtils.isBlank(value)) { + String dmeName = ent.getValue(); + dme2AdditionalProps.put(dmeName, value); + } + } + + final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + + return BusTopicParams.builder() + .servers(serverList) + .topic(topic) + .effectiveTopic(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, + topic)) + .apiKey(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX, null)) + .apiSecret(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX, null)) + .userName(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX, null)) + .password(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX, null)) + .environment(props.getString(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX, + null)) + .aftEnvironment(props.getString( + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX, null)) + .partner(props.getString(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX, null)) + .latitude(props.getString(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX, null)) + .longitude(props.getString(PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX, null)) + .additionalProps(dme2AdditionalProps) + .managed(props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true)) + .useHttps(props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX, false)) + .allowSelfSignedCerts(props.getBoolean( + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX, false)); + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/PropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/PropertyUtils.java new file mode 100644 index 00000000..265346c9 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/PropertyUtils.java @@ -0,0 +1,135 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.utils; + +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; + +/** + * Utilities for extracting property values and converting them to other types. + */ +public class PropertyUtils { + /** + * Properties on which to work. + */ + private Properties properties; + + /** + * Prefix to prepend to property names. + */ + private String prefix; + + /** + * Function to invoke if a property value is invalid. + */ + private TriConsumer<String, String, Exception> invalidHandler; + + + /** + * Constructs the object. + * + * @param properties properties on which to work + * @param prefix prefix to prepend to property names + * @param invalidHandler function to invoke if a property value is invalid + */ + public PropertyUtils(Properties properties, String prefix, TriConsumer<String, String, Exception> invalidHandler) { + this.properties = properties; + this.prefix = prefix; + this.invalidHandler = invalidHandler; + } + + /** + * Gets a string property. + * + * @param propName name of the property whose value is to be retrieved + * @param defaultValue value to use if the property value is empty or does not exist + * @return the property's value + */ + public String getString(String propName, String defaultValue) { + String propValue = getProperty(propName); + return (StringUtils.isBlank(propValue) ? defaultValue : propValue); + } + + /** + * Gets a boolean property. + * + * @param propName name of the property whose value is to be retrieved + * @param defaultValue value to use if the property value is empty or does not exist + * @return the property's value + */ + public boolean getBoolean(String propName, boolean defaultValue) { + String propValue = getProperty(propName); + + if (!StringUtils.isBlank(propValue)) { + return Boolean.parseBoolean(propValue); + } + + return defaultValue; + } + + /** + * Gets an integer property. + * + * @param propName name of the property whose value is to be retrieved + * @param defaultValue value to use if the property value is empty or does not exist + * @return the property's value + */ + public int getInteger(String propName, int defaultValue) { + String propValue = getProperty(propName); + + if (!StringUtils.isBlank(propValue)) { + try { + return Integer.parseInt(propValue); + + } catch (NumberFormatException nfe) { + invalidHandler.accept(getFullName(propName), propValue, nfe); + } + } + + return defaultValue; + } + + + /** + * Gets a property's value. + * + * @param propName name of the property whose value is to be retrieved + * @return the property's value, or {@code null} if it does not exist + */ + private String getProperty(String propName) { + return properties.getProperty(getFullName(propName)); + } + + /** + * Gets the full property name, with the prefix prepended. + * + * @param propName property name, without the prefix + * @return the full property name + */ + private String getFullName(String propName) { + return prefix + propName; + } + + @FunctionalInterface + public static interface TriConsumer<A,B,C> { + public void accept(A propName, B propValue, C exception); + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/UebPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/UebPropertyUtils.java new file mode 100644 index 00000000..d0217518 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/UebPropertyUtils.java @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.utils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; + +public class UebPropertyUtils { + + private UebPropertyUtils() { + // do nothing + } + + /** + * Makes a topic builder, configuring it with properties that are common to both + * sources and sinks. + * + * @param props properties to be used to configure the builder + * @param topic topic being configured + * @param servers target servers + * @return a topic builder + */ + public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) { + + final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + + return BusTopicParams.builder() + .servers(serverList) + .topic(topic) + .effectiveTopic(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, + topic)) + .apiKey(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX, null)) + .apiSecret(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX, null)) + .consumerGroup(props.getString( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null)) + .consumerInstance(props.getString( + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null)) + .managed(props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true)) + .useHttps(props.getBoolean(PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX, false)) + .allowSelfSignedCerts(props.getBoolean( + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX, + false)); + } +} |