diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org')
12 files changed, 1634 insertions, 1497 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java index 4c36a39a..a3eb4df6 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -21,18 +21,9 @@ package org.onap.policy.common.endpoints.event.comm.bus; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; - import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * DMAAP Topic Sink Factory. @@ -121,296 +112,3 @@ public interface DmaapTopicSinkFactory { */ List<DmaapTopicSink> inventory(); } - - -/* ------------- implementation ----------------- */ - -/** - * Factory of DMAAP Reader Topics indexed by topic name. - */ -class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { - - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class); - - /** - * DMAAP Topic Name Index. - */ - protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>(); - - @Override - public DmaapTopicSink build(BusTopicParams busTopicParams) { - - if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) { - return dmaapTopicWriters.get(busTopicParams.getTopic()); - } - - DmaapTopicSink dmaapTopicSink = makeSink(busTopicParams); - - if (busTopicParams.isManaged()) { - dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink); - } - return dmaapTopicSink; - } - } - - @Override - public DmaapTopicSink build(List<String> servers, String topic) { - return this.build(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .managed(true) - .useHttps(false) - .allowSelfSignedCerts(false) - .build()); - } - - @Override - public List<DmaapTopicSink> build(Properties properties) { - - String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS); - if (writeTopics == null || writeTopics.isEmpty()) { - logger.info("{}: no topic for DMaaP Sink", this); - return new ArrayList<>(); - } - - List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); - List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>(); - synchronized (this) { - for (String topic : writeTopicList) { - if (this.dmaapTopicWriters.containsKey(topic)) { - newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic)); - continue; - } - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - List<String> serverList; - if (servers != null && !servers.isEmpty()) { - serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - } else { - serverList = new ArrayList<>(); - } - - final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - - final String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); - final String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); - - final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); - - final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - - /* DME2 Properties */ - - final String dme2Environment = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - - final String dme2AftEnvironment = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - - final String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); - - final String dme2RouteOffer = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); - - final String dme2Latitude = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - - final String dme2Longitude = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - - final String dme2EpReadTimeoutMs = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); - - final String dme2EpConnTimeout = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); - - final String dme2RoundtripTimeoutMs = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); - - final String dme2Version = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); - - final String dme2SubContextPath = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); - - final String dme2SessionStickinessRequired = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS - + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); - - Map<String, String> dme2AdditionalProps = new HashMap<>(); - - if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); - } - if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) { - dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); - } - if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); - } - if (dme2Version != null && !dme2Version.isEmpty()) { - dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); - } - if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) { - dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); - } - if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) { - dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); - } - - if (servers == null || servers.isEmpty()) { - logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); - continue; - } - - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - String allowSelfSignedCertsString = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - DmaapTopicSink dmaapTopicSink = this.build(BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .userName(aafMechId) - .password(aafPassword) - .partitionId(partitionKey) - .environment(dme2Environment) - .aftEnvironment(dme2AftEnvironment) - .partner(dme2Partner) - .latitude(dme2Latitude) - .longitude(dme2Longitude) - .additionalProps(dme2AdditionalProps) - .managed(managed) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - - newDmaapTopicSinks.add(dmaapTopicSink); - } - return newDmaapTopicSinks; - } - } - - /** - * Makes a new sink. - * - * @param busTopicParams parameters to use to configure the sink - * @return a new sink - */ - protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) { - return new InlineDmaapTopicSink(busTopicParams); - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - DmaapTopicSink dmaapTopicWriter; - synchronized (this) { - if (!dmaapTopicWriters.containsKey(topic)) { - return; - } - - dmaapTopicWriter = dmaapTopicWriters.remove(topic); - } - - dmaapTopicWriter.shutdown(); - } - - @Override - public void destroy() { - List<DmaapTopicSink> writers = this.inventory(); - for (DmaapTopicSink writer : writers) { - writer.shutdown(); - } - - synchronized (this) { - this.dmaapTopicWriters.clear(); - } - } - - @Override - public DmaapTopicSink get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicWriters.containsKey(topic)) { - return dmaapTopicWriters.get(topic); - } else { - throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); - } - } - } - - @Override - public synchronized List<DmaapTopicSink> inventory() { - return new ArrayList<>(this.dmaapTopicWriters.values()); - } - - @Override - public String toString() { - return "IndexedDmaapTopicSinkFactory []"; - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java index ae6c6c3b..35a79bf1 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -21,18 +21,9 @@ package org.onap.policy.common.endpoints.event.comm.bus; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; - import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * DMAAP Topic Source Factory. @@ -119,349 +110,3 @@ public interface DmaapTopicSourceFactory { */ List<DmaapTopicSource> inventory(); } - - -/* ------------- implementation ----------------- */ - -/** - * Factory of DMAAP Source Topics indexed by topic name. - */ - -class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class); - - /** - * DMaaP Topic Name Index. - */ - protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>(); - - @Override - public DmaapTopicSource build(BusTopicParams busTopicParams) { - - if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) { - return dmaapTopicSources.get(busTopicParams.getTopic()); - } - - DmaapTopicSource dmaapTopicSource = makeSource(busTopicParams); - - if (busTopicParams.isManaged()) { - dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource); - } - return dmaapTopicSource; - } - } - - @Override - public List<DmaapTopicSource> build(Properties properties) { - - String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS); - if (readTopics == null || readTopics.isEmpty()) { - logger.info("{}: no topic for DMaaP Source", this); - return new ArrayList<>(); - } - List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); - - List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>(); - synchronized (this) { - for (String topic : readTopicList) { - if (this.dmaapTopicSources.containsKey(topic)) { - dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - List<String> serverList; - if (servers != null && !servers.isEmpty()) { - serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - } else { - serverList = new ArrayList<>(); - } - - final String apiKey = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - - final String apiSecret = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - - final String aafMechId = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); - - final String aafPassword = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); - - final String consumerGroup = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); - - final String consumerInstance = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); - - final String fetchTimeoutString = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); - - /* DME2 Properties */ - - final String dme2Environment = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); - - final String dme2AftEnvironment = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); - - final String dme2Partner = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); - - final String dme2RouteOffer = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); - - final String dme2Latitude = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); - - final String dme2Longitude = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); - - final String dme2EpReadTimeoutMs = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); - - final String dme2EpConnTimeout = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); - - final String dme2RoundtripTimeoutMs = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); - - final String dme2Version = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); - - final String dme2SubContextPath = properties.getProperty( - PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); - - final String dme2SessionStickinessRequired = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); - - Map<String, String> dme2AdditionalProps = new HashMap<>(); - - if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); - } - if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) { - dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); - } - if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); - } - if (dme2Version != null && !dme2Version.isEmpty()) { - dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); - } - if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) { - dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - } - if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) { - dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); - } - if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) { - dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); - } - - - if (servers == null || servers.isEmpty()) { - - logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); - continue; - } - - int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH; - if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { - try { - fetchTimeout = Integer.parseInt(fetchTimeoutString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString, - topic); - } - } - - String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); - int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH; - if (fetchLimitString != null && !fetchLimitString.isEmpty()) { - try { - fetchLimit = Integer.parseInt(fetchLimitString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString, - topic); - } - } - - String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - String allowSelfSignedCertsString = - properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - - DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .userName(aafMechId) - .password(aafPassword) - .consumerGroup(consumerGroup) - .consumerInstance(consumerInstance) - .fetchTimeout(fetchTimeout) - .fetchLimit(fetchLimit) - .environment(dme2Environment) - .aftEnvironment(dme2AftEnvironment) - .partner(dme2Partner) - .latitude(dme2Latitude) - .longitude(dme2Longitude) - .additionalProps(dme2AdditionalProps) - .managed(managed) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - - dmaapTopicSourceLst.add(uebTopicSource); - } - } - return dmaapTopicSourceLst; - } - - @Override - public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) { - return this.build(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .fetchTimeout(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH) - .fetchLimit(DmaapTopicSource.DEFAULT_LIMIT_FETCH) - .managed(true) - .useHttps(false) - .allowSelfSignedCerts(false) - .build()); - } - - @Override - public DmaapTopicSource build(List<String> servers, String topic) { - return this.build(servers, topic, null, null); - } - - /** - * Makes a new source. - * - * @param busTopicParams parameters to use to configure the source - * @return a new source - */ - protected DmaapTopicSource makeSource(BusTopicParams busTopicParams) { - return new SingleThreadedDmaapTopicSource(busTopicParams); - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - DmaapTopicSource uebTopicSource; - - synchronized (this) { - if (!dmaapTopicSources.containsKey(topic)) { - return; - } - - uebTopicSource = dmaapTopicSources.remove(topic); - } - - uebTopicSource.shutdown(); - } - - @Override - public void destroy() { - List<DmaapTopicSource> readers = this.inventory(); - for (DmaapTopicSource reader : readers) { - reader.shutdown(); - } - - synchronized (this) { - this.dmaapTopicSources.clear(); - } - } - - @Override - public DmaapTopicSource get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (dmaapTopicSources.containsKey(topic)) { - return dmaapTopicSources.get(topic); - } else { - throw new IllegalStateException("DmaapTopiceSource for " + topic + " not found"); - } - } - } - - @Override - public synchronized List<DmaapTopicSource> inventory() { - return new ArrayList<>(this.dmaapTopicSources.values()); - } - - @Override - public String toString() { - return "IndexedDmaapTopicSourceFactory []"; - } - -} - diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java new file mode 100644 index 00000000..659833ce --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSinkFactory.java @@ -0,0 +1,324 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; +import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory of DMAAP Reader Topics indexed by topic name. + */ +class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { + + private static final String MISSING_TOPIC = "A topic must be provided"; + + /** + * Logger. + */ + private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class); + + /** + * DMAAP Topic Name Index. + */ + protected HashMap<String, DmaapTopicSink> dmaapTopicWriters = new HashMap<>(); + + @Override + public DmaapTopicSink build(BusTopicParams busTopicParams) { + + if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (dmaapTopicWriters.containsKey(busTopicParams.getTopic())) { + return dmaapTopicWriters.get(busTopicParams.getTopic()); + } + + DmaapTopicSink dmaapTopicSink = makeSink(busTopicParams); + + if (busTopicParams.isManaged()) { + dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink); + } + return dmaapTopicSink; + } + } + + @Override + public DmaapTopicSink build(List<String> servers, String topic) { + return this.build(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .managed(true) + .useHttps(false) + .allowSelfSignedCerts(false) + .build()); + } + + @Override + public List<DmaapTopicSink> build(Properties properties) { + + String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS); + if (writeTopics == null || writeTopics.isEmpty()) { + logger.info("{}: no topic for DMaaP Sink", this); + return new ArrayList<>(); + } + + List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); + List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<>(); + synchronized (this) { + for (String topic : writeTopicList) { + if (this.dmaapTopicWriters.containsKey(topic)) { + newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic)); + continue; + } + String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + List<String> serverList; + if (servers != null && !servers.isEmpty()) { + serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + } else { + serverList = new ArrayList<>(); + } + + final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); + final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); + + final String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); + final String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); + + final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); + + final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + + /* DME2 Properties */ + + final String dme2Environment = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + + final String dme2AftEnvironment = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + + final String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); + + final String dme2RouteOffer = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); + + final String dme2Latitude = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + + final String dme2Longitude = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + + final String dme2EpReadTimeoutMs = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); + + final String dme2EpConnTimeout = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); + + final String dme2RoundtripTimeoutMs = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); + + final String dme2Version = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); + + final String dme2SubContextPath = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); + + final String dme2SessionStickinessRequired = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); + + Map<String, String> dme2AdditionalProps = new HashMap<>(); + + if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) { + dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); + } + if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) { + dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); + } + if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) { + dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); + } + if (dme2Version != null && !dme2Version.isEmpty()) { + dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); + } + if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) { + dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + } + if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) { + dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); + } + if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) { + dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); + } + + if (servers == null || servers.isEmpty()) { + logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); + continue; + } + + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + // default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()) { + useHttps = Boolean.parseBoolean(useHttpsString); + } + + String allowSelfSignedCertsString = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + // default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + + DmaapTopicSink dmaapTopicSink = this.build(BusTopicParams.builder() + .servers(serverList) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .userName(aafMechId) + .password(aafPassword) + .partitionId(partitionKey) + .environment(dme2Environment) + .aftEnvironment(dme2AftEnvironment) + .partner(dme2Partner) + .latitude(dme2Latitude) + .longitude(dme2Longitude) + .additionalProps(dme2AdditionalProps) + .managed(managed) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts) + .build()); + + newDmaapTopicSinks.add(dmaapTopicSink); + } + return newDmaapTopicSinks; + } + } + + /** + * Makes a new sink. + * + * @param busTopicParams parameters to use to configure the sink + * @return a new sink + */ + protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) { + return new InlineDmaapTopicSink(busTopicParams); + } + + @Override + public void destroy(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + DmaapTopicSink dmaapTopicWriter; + synchronized (this) { + if (!dmaapTopicWriters.containsKey(topic)) { + return; + } + + dmaapTopicWriter = dmaapTopicWriters.remove(topic); + } + + dmaapTopicWriter.shutdown(); + } + + @Override + public void destroy() { + List<DmaapTopicSink> writers = this.inventory(); + for (DmaapTopicSink writer : writers) { + writer.shutdown(); + } + + synchronized (this) { + this.dmaapTopicWriters.clear(); + } + } + + @Override + public DmaapTopicSink get(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (dmaapTopicWriters.containsKey(topic)) { + return dmaapTopicWriters.get(topic); + } else { + throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); + } + } + } + + @Override + public synchronized List<DmaapTopicSink> inventory() { + return new ArrayList<>(this.dmaapTopicWriters.values()); + } + + @Override + public String toString() { + return "IndexedDmaapTopicSinkFactory []"; + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java new file mode 100644 index 00000000..0c008f11 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java @@ -0,0 +1,376 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; +import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory of DMAAP Source Topics indexed by topic name. + */ + +class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { + private static final String MISSING_TOPIC = "A topic must be provided"; + + /** + * Logger. + */ + private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class); + + /** + * DMaaP Topic Name Index. + */ + protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>(); + + @Override + public DmaapTopicSource build(BusTopicParams busTopicParams) { + + if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (dmaapTopicSources.containsKey(busTopicParams.getTopic())) { + return dmaapTopicSources.get(busTopicParams.getTopic()); + } + + DmaapTopicSource dmaapTopicSource = makeSource(busTopicParams); + + if (busTopicParams.isManaged()) { + dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource); + } + return dmaapTopicSource; + } + } + + @Override + public List<DmaapTopicSource> build(Properties properties) { + + String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS); + if (readTopics == null || readTopics.isEmpty()) { + logger.info("{}: no topic for DMaaP Source", this); + return new ArrayList<>(); + } + List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); + + List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>(); + synchronized (this) { + for (String topic : readTopicList) { + if (this.dmaapTopicSources.containsKey(topic)) { + dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic)); + continue; + } + + String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + List<String> serverList; + if (servers != null && !servers.isEmpty()) { + serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + } else { + serverList = new ArrayList<>(); + } + + final String apiKey = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); + + final String apiSecret = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); + + final String aafMechId = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX); + + final String aafPassword = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX); + + final String consumerGroup = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); + + final String consumerInstance = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); + + final String fetchTimeoutString = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); + + /* DME2 Properties */ + + final String dme2Environment = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + + final String dme2AftEnvironment = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + + final String dme2Partner = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); + + final String dme2RouteOffer = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); + + final String dme2Latitude = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + + final String dme2Longitude = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + + final String dme2EpReadTimeoutMs = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); + + final String dme2EpConnTimeout = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); + + final String dme2RoundtripTimeoutMs = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); + + final String dme2Version = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); + + final String dme2SubContextPath = properties.getProperty( + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); + + final String dme2SessionStickinessRequired = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); + + Map<String, String> dme2AdditionalProps = new HashMap<>(); + + if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) { + dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); + } + if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) { + dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); + } + if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) { + dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); + } + if (dme2Version != null && !dme2Version.isEmpty()) { + dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); + } + if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) { + dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + } + if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) { + dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); + } + if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) { + dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); + } + + + if (servers == null || servers.isEmpty()) { + + logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this); + continue; + } + + int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH; + if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { + try { + fetchTimeout = Integer.parseInt(fetchTimeoutString); + } catch (NumberFormatException nfe) { + logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString, + topic); + } + } + + String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); + int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH; + if (fetchLimitString != null && !fetchLimitString.isEmpty()) { + try { + fetchLimit = Integer.parseInt(fetchLimitString); + } catch (NumberFormatException nfe) { + logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString, + topic); + } + } + + String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + // default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()) { + useHttps = Boolean.parseBoolean(useHttpsString); + } + + String allowSelfSignedCertsString = + properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + // default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + + + DmaapTopicSource uebTopicSource = this.build(BusTopicParams.builder() + .servers(serverList) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .userName(aafMechId) + .password(aafPassword) + .consumerGroup(consumerGroup) + .consumerInstance(consumerInstance) + .fetchTimeout(fetchTimeout) + .fetchLimit(fetchLimit) + .environment(dme2Environment) + .aftEnvironment(dme2AftEnvironment) + .partner(dme2Partner) + .latitude(dme2Latitude) + .longitude(dme2Longitude) + .additionalProps(dme2AdditionalProps) + .managed(managed) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts) + .build()); + + dmaapTopicSourceLst.add(uebTopicSource); + } + } + return dmaapTopicSourceLst; + } + + @Override + public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) { + return this.build(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .fetchTimeout(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH) + .fetchLimit(DmaapTopicSource.DEFAULT_LIMIT_FETCH) + .managed(true) + .useHttps(false) + .allowSelfSignedCerts(false) + .build()); + } + + @Override + public DmaapTopicSource build(List<String> servers, String topic) { + return this.build(servers, topic, null, null); + } + + /** + * Makes a new source. + * + * @param busTopicParams parameters to use to configure the source + * @return a new source + */ + protected DmaapTopicSource makeSource(BusTopicParams busTopicParams) { + return new SingleThreadedDmaapTopicSource(busTopicParams); + } + + @Override + public void destroy(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + DmaapTopicSource uebTopicSource; + + synchronized (this) { + if (!dmaapTopicSources.containsKey(topic)) { + return; + } + + uebTopicSource = dmaapTopicSources.remove(topic); + } + + uebTopicSource.shutdown(); + } + + @Override + public void destroy() { + List<DmaapTopicSource> readers = this.inventory(); + for (DmaapTopicSource reader : readers) { + reader.shutdown(); + } + + synchronized (this) { + this.dmaapTopicSources.clear(); + } + } + + @Override + public DmaapTopicSource get(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (dmaapTopicSources.containsKey(topic)) { + return dmaapTopicSources.get(topic); + } else { + throw new IllegalStateException("DmaapTopiceSource for " + topic + " not found"); + } + } + } + + @Override + public synchronized List<DmaapTopicSource> inventory() { + return new ArrayList<>(this.dmaapTopicSources.values()); + } + + @Override + public String toString() { + return "IndexedDmaapTopicSourceFactory []"; + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java new file mode 100644 index 00000000..5b3fc669 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSinkFactory.java @@ -0,0 +1,237 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; +import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory of UEB Reader Topics indexed by topic name. + */ +class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { + private static final String MISSING_TOPIC = "A topic must be provided"; + + /** + * Logger. + */ + private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); + + /** + * UEB Topic Name Index. + */ + protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>(); + + @Override + public UebTopicSink build(BusTopicParams busTopicParams) { + + if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) { + throw new IllegalArgumentException("UEB Server(s) must be provided"); + } + + if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (uebTopicSinks.containsKey(busTopicParams.getTopic())) { + return uebTopicSinks.get(busTopicParams.getTopic()); + } + + UebTopicSink uebTopicWriter = makeSink(busTopicParams); + + if (busTopicParams.isManaged()) { + uebTopicSinks.put(busTopicParams.getTopic(), uebTopicWriter); + } + + return uebTopicWriter; + } + } + + + @Override + public UebTopicSink build(List<String> servers, String topic) { + return this.build(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .managed(true) + .useHttps(false) + .allowSelfSignedCerts(false) + .build()); + } + + + @Override + public List<UebTopicSink> build(Properties properties) { + + String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS); + if (writeTopics == null || writeTopics.isEmpty()) { + logger.info("{}: no topic for UEB Sink", this); + return new ArrayList<>(); + } + + List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); + List<UebTopicSink> newUebTopicSinks = new ArrayList<>(); + synchronized (this) { + for (String topic : writeTopicList) { + if (this.uebTopicSinks.containsKey(topic)) { + newUebTopicSinks.add(this.uebTopicSinks.get(topic)); + continue; + } + + String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + if (servers == null || servers.isEmpty()) { + logger.error("{}: no UEB servers configured for sink {}", this, topic); + continue; + } + + final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + + final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); + final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); + final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); + + String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + // default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()) { + useHttps = Boolean.parseBoolean(useHttpsString); + } + + + String allowSelfSignedCertsString = + properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + // default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + + UebTopicSink uebTopicWriter = this.build(BusTopicParams.builder() + .servers(serverList) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .partitionId(partitionKey) + .managed(managed) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts) + .build()); + newUebTopicSinks.add(uebTopicWriter); + } + return newUebTopicSinks; + } + } + + @Override + public void destroy(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + UebTopicSink uebTopicWriter; + synchronized (this) { + if (!uebTopicSinks.containsKey(topic)) { + return; + } + + uebTopicWriter = uebTopicSinks.remove(topic); + } + + uebTopicWriter.shutdown(); + } + + @Override + public void destroy() { + List<UebTopicSink> writers = this.inventory(); + for (UebTopicSink writer : writers) { + writer.shutdown(); + } + + synchronized (this) { + this.uebTopicSinks.clear(); + } + } + + @Override + public UebTopicSink get(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (uebTopicSinks.containsKey(topic)) { + return uebTopicSinks.get(topic); + } else { + throw new IllegalStateException("UebTopicSink for " + topic + " not found"); + } + } + } + + @Override + public synchronized List<UebTopicSink> inventory() { + return new ArrayList<>(this.uebTopicSinks.values()); + } + + /** + * Makes a new sink. + * + * @param busTopicParams parameters to use to configure the sink + * @return a new sink + */ + protected UebTopicSink makeSink(BusTopicParams busTopicParams) { + return new InlineUebTopicSink(busTopicParams); + } + + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("IndexedUebTopicSinkFactory []"); + return builder.toString(); + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java new file mode 100644 index 00000000..88a472c2 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java @@ -0,0 +1,274 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; +import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory of UEB Source Topics indexed by topic name. + */ +class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { + private static final String MISSING_TOPIC = "A topic must be provided"; + + /** + * Logger. + */ + private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class); + + /** + * UEB Topic Name Index. + */ + protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>(); + + @Override + public UebTopicSource build(BusTopicParams busTopicParams) { + if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) { + throw new IllegalArgumentException("UEB Server(s) must be provided"); + } + + if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (uebTopicSources.containsKey(busTopicParams.getTopic())) { + return uebTopicSources.get(busTopicParams.getTopic()); + } + + UebTopicSource uebTopicSource = makeSource(busTopicParams); + + if (busTopicParams.isManaged()) { + uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource); + } + + return uebTopicSource; + } + } + + @Override + public List<UebTopicSource> build(Properties properties) { + + String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS); + if (readTopics == null || readTopics.isEmpty()) { + logger.info("{}: no topic for UEB Source", this); + return new ArrayList<>(); + } + List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); + + List<UebTopicSource> newUebTopicSources = new ArrayList<>(); + synchronized (this) { + for (String topic : readTopicList) { + if (this.uebTopicSources.containsKey(topic)) { + newUebTopicSources.add(this.uebTopicSources.get(topic)); + continue; + } + + String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + if (servers == null || servers.isEmpty()) { + logger.error("{}: no UEB servers configured for sink {}", this, topic); + continue; + } + + final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + + final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); + + final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); + + final String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); + + final String consumerInstance = properties.getProperty( + PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); + + String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); + int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH; + if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { + try { + fetchTimeout = Integer.parseInt(fetchTimeoutString); + } catch (NumberFormatException nfe) { + logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString, + topic); + } + } + + String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); + int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH; + if (fetchLimitString != null && !fetchLimitString.isEmpty()) { + try { + fetchLimit = Integer.parseInt(fetchLimitString); + } catch (NumberFormatException nfe) { + logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString, + topic); + } + } + + String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + // default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()) { + useHttps = Boolean.parseBoolean(useHttpsString); + } + + String allowSelfSignedCertsString = + properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + // default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + + UebTopicSource uebTopicSource = this.build(BusTopicParams.builder() + .servers(serverList) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .consumerGroup(consumerGroup) + .consumerInstance(consumerInstance) + .fetchTimeout(fetchTimeout) + .fetchLimit(fetchLimit) + .managed(managed) + .useHttps(useHttps) + .allowSelfSignedCerts(allowSelfSignedCerts).build()); + newUebTopicSources.add(uebTopicSource); + } + } + return newUebTopicSources; + } + + @Override + public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) { + + return this.build(BusTopicParams.builder() + .servers(servers) + .topic(topic) + .apiKey(apiKey) + .apiSecret(apiSecret) + .fetchTimeout(UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH) + .fetchLimit(UebTopicSource.DEFAULT_LIMIT_FETCH) + .managed(true) + .useHttps(false) + .allowSelfSignedCerts(true).build()); + } + + @Override + public UebTopicSource build(List<String> servers, String topic) { + return this.build(servers, topic, null, null); + } + + /** + * Makes a new source. + * + * @param busTopicParams parameters to use to configure the source + * @return a new source + */ + protected UebTopicSource makeSource(BusTopicParams busTopicParams) { + return new SingleThreadedUebTopicSource(busTopicParams); + } + + @Override + public void destroy(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + UebTopicSource uebTopicSource; + + synchronized (this) { + if (!uebTopicSources.containsKey(topic)) { + return; + } + + uebTopicSource = uebTopicSources.remove(topic); + } + + uebTopicSource.shutdown(); + } + + @Override + public void destroy() { + List<UebTopicSource> readers = this.inventory(); + for (UebTopicSource reader : readers) { + reader.shutdown(); + } + + synchronized (this) { + this.uebTopicSources.clear(); + } + } + + @Override + public UebTopicSource get(String topic) { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException(MISSING_TOPIC); + } + + synchronized (this) { + if (uebTopicSources.containsKey(topic)) { + return uebTopicSources.get(topic); + } else { + throw new IllegalStateException("UebTopiceSource for " + topic + " not found"); + } + } + } + + @Override + public synchronized List<UebTopicSource> inventory() { + return new ArrayList<>(this.uebTopicSources.values()); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("IndexedUebTopicSourceFactory []"); + return builder.toString(); + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java index c200af5a..0cf095fd 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -21,17 +21,9 @@ package org.onap.policy.common.endpoints.event.comm.bus; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Properties; - import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * UEB Topic Sink Factory. @@ -98,210 +90,3 @@ public interface UebTopicSinkFactory { */ List<UebTopicSink> inventory(); } - - -/* ------------- implementation ----------------- */ - -/** - * Factory of UEB Reader Topics indexed by topic name. - */ -class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); - - /** - * UEB Topic Name Index. - */ - protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>(); - - @Override - public UebTopicSink build(BusTopicParams busTopicParams) { - - if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) { - throw new IllegalArgumentException("UEB Server(s) must be provided"); - } - - if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSinks.containsKey(busTopicParams.getTopic())) { - return uebTopicSinks.get(busTopicParams.getTopic()); - } - - UebTopicSink uebTopicWriter = makeSink(busTopicParams); - - if (busTopicParams.isManaged()) { - uebTopicSinks.put(busTopicParams.getTopic(), uebTopicWriter); - } - - return uebTopicWriter; - } - } - - - @Override - public UebTopicSink build(List<String> servers, String topic) { - return this.build(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .managed(true) - .useHttps(false) - .allowSelfSignedCerts(false) - .build()); - } - - - @Override - public List<UebTopicSink> build(Properties properties) { - - String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS); - if (writeTopics == null || writeTopics.isEmpty()) { - logger.info("{}: no topic for UEB Sink", this); - return new ArrayList<>(); - } - - List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*"))); - List<UebTopicSink> newUebTopicSinks = new ArrayList<>(); - synchronized (this) { - for (String topic : writeTopicList) { - if (this.uebTopicSinks.containsKey(topic)) { - newUebTopicSinks.add(this.uebTopicSinks.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (servers == null || servers.isEmpty()) { - logger.error("{}: no UEB servers configured for sink {}", this, topic); - continue; - } - - final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); - - String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - - String allowSelfSignedCertsString = - properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - UebTopicSink uebTopicWriter = this.build(BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .partitionId(partitionKey) - .managed(managed) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts) - .build()); - newUebTopicSinks.add(uebTopicWriter); - } - return newUebTopicSinks; - } - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - UebTopicSink uebTopicWriter; - synchronized (this) { - if (!uebTopicSinks.containsKey(topic)) { - return; - } - - uebTopicWriter = uebTopicSinks.remove(topic); - } - - uebTopicWriter.shutdown(); - } - - @Override - public void destroy() { - List<UebTopicSink> writers = this.inventory(); - for (UebTopicSink writer : writers) { - writer.shutdown(); - } - - synchronized (this) { - this.uebTopicSinks.clear(); - } - } - - @Override - public UebTopicSink get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSinks.containsKey(topic)) { - return uebTopicSinks.get(topic); - } else { - throw new IllegalStateException("UebTopicSink for " + topic + " not found"); - } - } - } - - @Override - public synchronized List<UebTopicSink> inventory() { - return new ArrayList<>(this.uebTopicSinks.values()); - } - - /** - * Makes a new sink. - * - * @param busTopicParams parameters to use to configure the sink - * @return a new sink - */ - protected UebTopicSink makeSink(BusTopicParams busTopicParams) { - return new InlineUebTopicSink(busTopicParams); - } - - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("IndexedUebTopicSinkFactory []"); - return builder.toString(); - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java index 96315881..beacee3b 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -21,17 +21,9 @@ package org.onap.policy.common.endpoints.event.comm.bus; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Properties; - import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * UEB Topic Source Factory. @@ -110,247 +102,3 @@ public interface UebTopicSourceFactory { */ List<UebTopicSource> inventory(); } - - -/* ------------- implementation ----------------- */ - -/** - * Factory of UEB Source Topics indexed by topic name. - */ -class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { - private static final String MISSING_TOPIC = "A topic must be provided"; - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class); - - /** - * UEB Topic Name Index. - */ - protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>(); - - @Override - public UebTopicSource build(BusTopicParams busTopicParams) { - if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) { - throw new IllegalArgumentException("UEB Server(s) must be provided"); - } - - if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSources.containsKey(busTopicParams.getTopic())) { - return uebTopicSources.get(busTopicParams.getTopic()); - } - - UebTopicSource uebTopicSource = makeSource(busTopicParams); - - if (busTopicParams.isManaged()) { - uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource); - } - - return uebTopicSource; - } - } - - @Override - public List<UebTopicSource> build(Properties properties) { - - String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS); - if (readTopics == null || readTopics.isEmpty()) { - logger.info("{}: no topic for UEB Source", this); - return new ArrayList<>(); - } - List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*"))); - - List<UebTopicSource> newUebTopicSources = new ArrayList<>(); - synchronized (this) { - for (String topic : readTopicList) { - if (this.uebTopicSources.containsKey(topic)) { - newUebTopicSources.add(this.uebTopicSources.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - if (servers == null || servers.isEmpty()) { - logger.error("{}: no UEB servers configured for sink {}", this, topic); - continue; - } - - final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX); - - final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX); - - final String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX); - - final String consumerInstance = properties.getProperty( - PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX); - - String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); - int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH; - if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { - try { - fetchTimeout = Integer.parseInt(fetchTimeoutString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString, - topic); - } - } - - String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS - + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX); - int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH; - if (fetchLimitString != null && !fetchLimitString.isEmpty()) { - try { - fetchLimit = Integer.parseInt(fetchLimitString); - } catch (NumberFormatException nfe) { - logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString, - topic); - } - } - - String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." - + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - - // default is to use HTTP if no https property exists - boolean useHttps = false; - if (useHttpsString != null && !useHttpsString.isEmpty()) { - useHttps = Boolean.parseBoolean(useHttpsString); - } - - String allowSelfSignedCertsString = - properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); - - // default is to disallow self-signed certs - boolean allowSelfSignedCerts = false; - if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) { - allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); - } - - UebTopicSource uebTopicSource = this.build(BusTopicParams.builder() - .servers(serverList) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .consumerGroup(consumerGroup) - .consumerInstance(consumerInstance) - .fetchTimeout(fetchTimeout) - .fetchLimit(fetchLimit) - .managed(managed) - .useHttps(useHttps) - .allowSelfSignedCerts(allowSelfSignedCerts).build()); - newUebTopicSources.add(uebTopicSource); - } - } - return newUebTopicSources; - } - - @Override - public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) { - - return this.build(BusTopicParams.builder() - .servers(servers) - .topic(topic) - .apiKey(apiKey) - .apiSecret(apiSecret) - .fetchTimeout(UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH) - .fetchLimit(UebTopicSource.DEFAULT_LIMIT_FETCH) - .managed(true) - .useHttps(false) - .allowSelfSignedCerts(true).build()); - } - - @Override - public UebTopicSource build(List<String> servers, String topic) { - return this.build(servers, topic, null, null); - } - - /** - * Makes a new source. - * - * @param busTopicParams parameters to use to configure the source - * @return a new source - */ - protected UebTopicSource makeSource(BusTopicParams busTopicParams) { - return new SingleThreadedUebTopicSource(busTopicParams); - } - - @Override - public void destroy(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - UebTopicSource uebTopicSource; - - synchronized (this) { - if (!uebTopicSources.containsKey(topic)) { - return; - } - - uebTopicSource = uebTopicSources.remove(topic); - } - - uebTopicSource.shutdown(); - } - - @Override - public void destroy() { - List<UebTopicSource> readers = this.inventory(); - for (UebTopicSource reader : readers) { - reader.shutdown(); - } - - synchronized (this) { - this.uebTopicSources.clear(); - } - } - - @Override - public UebTopicSource get(String topic) { - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException(MISSING_TOPIC); - } - - synchronized (this) { - if (uebTopicSources.containsKey(topic)) { - return uebTopicSources.get(topic); - } else { - throw new IllegalStateException("UebTopiceSource for " + topic + " not found"); - } - } - } - - @Override - public synchronized List<UebTopicSource> inventory() { - return new ArrayList<>(this.uebTopicSources.values()); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("IndexedUebTopicSourceFactory []"); - return builder.toString(); - } -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClientFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClientFactory.java index ca10680b..133dfaea 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClientFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/HttpClientFactory.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -23,18 +23,9 @@ package org.onap.policy.common.endpoints.http.client; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Properties; - -import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; -import org.onap.policy.common.endpoints.http.client.internal.JerseyClient; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Http Client Factory. @@ -76,149 +67,3 @@ public interface HttpClientFactory { public void destroy(); } - - -/** - * HTTP client factory implementation indexed by name. - */ -class IndexedHttpClientFactory implements HttpClientFactory { - - /** - * Logger. - */ - private static Logger logger = LoggerFactory.getLogger(IndexedHttpClientFactory.class); - - protected HashMap<String, HttpClient> clients = new HashMap<>(); - - @Override - public synchronized HttpClient build(BusTopicParams busTopicParams) - throws KeyManagementException, NoSuchAlgorithmException { - if (clients.containsKey(busTopicParams.getClientName())) { - return clients.get(busTopicParams.getClientName()); - } - - JerseyClient client = - new JerseyClient(busTopicParams); - - if (busTopicParams.isManaged()) { - clients.put(busTopicParams.getClientName(), client); - } - - return client; - } - - @Override - public synchronized List<HttpClient> build(Properties properties) - throws KeyManagementException, NoSuchAlgorithmException { - ArrayList<HttpClient> clientList = new ArrayList<>(); - - String clientNames = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES); - if (clientNames == null || clientNames.isEmpty()) { - return clientList; - } - - List<String> clientNameList = new ArrayList<>(Arrays.asList(clientNames.split("\\s*,\\s*"))); - - for (String clientName : clientNameList) { - String httpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." - + clientName + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - boolean https = false; - if (StringUtils.isNotBlank(httpsString)) { - https = Boolean.parseBoolean(httpsString); - } - - String hostName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." - + clientName + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX); - - String servicePortString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES - + "." + clientName + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX); - int port; - try { - if (servicePortString == null || servicePortString.isEmpty()) { - continue; - } - port = Integer.parseInt(servicePortString); - } catch (NumberFormatException nfe) { - logger.error("http-client-factory: cannot parse port {}", servicePortString, nfe); - continue; - } - - String baseUrl = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." - + clientName + PolicyEndPointProperties.PROPERTY_HTTP_URL_SUFFIX); - - String userName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." - + clientName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX); - - String password = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." - + clientName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX); - - String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." - + clientName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - try { - HttpClient client = - this.build(BusTopicParams.builder() - .clientName(clientName) - .useHttps(https) - .allowSelfSignedCerts(https) - .hostname(hostName) - .port(port) - .basePath(baseUrl) - .userName(userName) - .password(password) - .managed(managed) - .build()); - clientList.add(client); - } catch (Exception e) { - logger.error("http-client-factory: cannot build client {}", clientName, e); - } - } - - return clientList; - } - - @Override - public synchronized HttpClient get(String name) { - if (clients.containsKey(name)) { - return clients.get(name); - } - - throw new IllegalArgumentException("Http Client " + name + " not found"); - } - - @Override - public synchronized List<HttpClient> inventory() { - return new ArrayList<>(this.clients.values()); - } - - @Override - public synchronized void destroy(String name) { - if (!clients.containsKey(name)) { - return; - } - - HttpClient client = clients.remove(name); - try { - client.shutdown(); - } catch (IllegalStateException e) { - logger.error("http-client-factory: cannot shutdown client {}", client, e); - } - } - - @Override - public void destroy() { - List<HttpClient> clientsInventory = this.inventory(); - for (HttpClient client : clientsInventory) { - client.shutdown(); - } - - synchronized (this) { - this.clients.clear(); - } - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java new file mode 100644 index 00000000..d4d4a281 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/client/IndexedHttpClientFactory.java @@ -0,0 +1,181 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.http.client; + +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.lang3.StringUtils; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; +import org.onap.policy.common.endpoints.http.client.internal.JerseyClient; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HTTP client factory implementation indexed by name. + */ +class IndexedHttpClientFactory implements HttpClientFactory { + + /** + * Logger. + */ + private static Logger logger = LoggerFactory.getLogger(IndexedHttpClientFactory.class); + + protected HashMap<String, HttpClient> clients = new HashMap<>(); + + @Override + public synchronized HttpClient build(BusTopicParams busTopicParams) + throws KeyManagementException, NoSuchAlgorithmException { + if (clients.containsKey(busTopicParams.getClientName())) { + return clients.get(busTopicParams.getClientName()); + } + + JerseyClient client = + new JerseyClient(busTopicParams); + + if (busTopicParams.isManaged()) { + clients.put(busTopicParams.getClientName(), client); + } + + return client; + } + + @Override + public synchronized List<HttpClient> build(Properties properties) + throws KeyManagementException, NoSuchAlgorithmException { + ArrayList<HttpClient> clientList = new ArrayList<>(); + + String clientNames = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES); + if (clientNames == null || clientNames.isEmpty()) { + return clientList; + } + + List<String> clientNameList = new ArrayList<>(Arrays.asList(clientNames.split("\\s*,\\s*"))); + + for (String clientName : clientNameList) { + String httpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + boolean https = false; + if (StringUtils.isNotBlank(httpsString)) { + https = Boolean.parseBoolean(httpsString); + } + + String hostName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX); + + String servicePortString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + + "." + clientName + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX); + int port; + try { + if (servicePortString == null || servicePortString.isEmpty()) { + continue; + } + port = Integer.parseInt(servicePortString); + } catch (NumberFormatException nfe) { + logger.error("http-client-factory: cannot parse port {}", servicePortString, nfe); + continue; + } + + String baseUrl = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + PolicyEndPointProperties.PROPERTY_HTTP_URL_SUFFIX); + + String userName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX); + + String password = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX); + + String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + + clientName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + try { + HttpClient client = + this.build(BusTopicParams.builder() + .clientName(clientName) + .useHttps(https) + .allowSelfSignedCerts(https) + .hostname(hostName) + .port(port) + .basePath(baseUrl) + .userName(userName) + .password(password) + .managed(managed) + .build()); + clientList.add(client); + } catch (Exception e) { + logger.error("http-client-factory: cannot build client {}", clientName, e); + } + } + + return clientList; + } + + @Override + public synchronized HttpClient get(String name) { + if (clients.containsKey(name)) { + return clients.get(name); + } + + throw new IllegalArgumentException("Http Client " + name + " not found"); + } + + @Override + public synchronized List<HttpClient> inventory() { + return new ArrayList<>(this.clients.values()); + } + + @Override + public synchronized void destroy(String name) { + if (!clients.containsKey(name)) { + return; + } + + HttpClient client = clients.remove(name); + try { + client.shutdown(); + } catch (IllegalStateException e) { + logger.error("http-client-factory: cannot shutdown client {}", client, e); + } + } + + @Override + public void destroy() { + List<HttpClient> clientsInventory = this.inventory(); + for (HttpClient client : clientsInventory) { + client.shutdown(); + } + + synchronized (this) { + this.clients.clear(); + } + } + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactory.java index 527ada99..0c30e3e5 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/HttpServletServerFactory.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,15 +20,8 @@ package org.onap.policy.common.endpoints.http.server; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Properties; -import org.onap.policy.common.endpoints.http.server.internal.JettyJerseyServer; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Factory of HTTP Servlet-Enabled Servlets. @@ -101,208 +94,3 @@ public interface HttpServletServerFactory { */ void destroy(); } - -/** - * Indexed factory implementation. - */ -class IndexedHttpServletServerFactory implements HttpServletServerFactory { - - private static final String SPACES_COMMA_SPACES = "\\s*,\\s*"; - - /** - * logger. - */ - protected static Logger logger = LoggerFactory.getLogger(IndexedHttpServletServerFactory.class); - - /** - * servers index. - */ - protected HashMap<Integer, HttpServletServer> servers = new HashMap<>(); - - @Override - public synchronized HttpServletServer build(String name, boolean https, String host, int port, String contextPath, - boolean swagger, boolean managed) { - - if (servers.containsKey(port)) { - return servers.get(port); - } - - JettyJerseyServer server = new JettyJerseyServer(name, https, host, port, contextPath, swagger); - if (managed) { - servers.put(port, server); - } - - return server; - } - - @Override - public synchronized HttpServletServer build(String name, String host, int port, String contextPath, boolean swagger, - boolean managed) { - return build(name, false, host, port, contextPath, swagger, managed); - } - - @Override - public synchronized List<HttpServletServer> build(Properties properties) { - - ArrayList<HttpServletServer> serviceList = new ArrayList<>(); - - String serviceNames = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES); - if (serviceNames == null || serviceNames.isEmpty()) { - logger.warn("No topic for HTTP Service: {}", properties); - return serviceList; - } - - List<String> serviceNameList = Arrays.asList(serviceNames.split(SPACES_COMMA_SPACES)); - - for (String serviceName : serviceNameList) { - String servicePortString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX); - - int servicePort; - try { - if (servicePortString == null || servicePortString.isEmpty()) { - if (logger.isWarnEnabled()) { - logger.warn("No HTTP port for service in {}", serviceName); - } - continue; - } - servicePort = Integer.parseInt(servicePortString); - } catch (NumberFormatException nfe) { - if (logger.isWarnEnabled()) { - logger.warn("No HTTP port for service in {}", serviceName); - } - continue; - } - - final String hostName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." - + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX); - - final String contextUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX); - - final String userName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." - + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX); - - final String password = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." - + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX); - - final String authUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_URIPATH_SUFFIX); - - final String restClasses = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX); - - final String filterClasses = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_FILTER_CLASSES_SUFFIX); - - final String restPackages = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX); - - final String restUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_URIPATH_SUFFIX); - - final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES - + "." + serviceName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - String swaggerString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." - + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX); - boolean swagger = false; - if (swaggerString != null && !swaggerString.isEmpty()) { - swagger = Boolean.parseBoolean(swaggerString); - } - - String httpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." - + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); - boolean https = false; - if (httpsString != null && !httpsString.isEmpty()) { - https = Boolean.parseBoolean(httpsString); - } - - String aafString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." - + serviceName + PolicyEndPointProperties.PROPERTY_AAF_SUFFIX); - boolean aaf = false; - if (aafString != null && !aafString.isEmpty()) { - aaf = Boolean.parseBoolean(aafString); - } - - HttpServletServer service = build(serviceName, https, hostName, servicePort, contextUriPath, swagger, - managed); - - /* authentication method either AAF or HTTP Basic Auth */ - - if (aaf) { - service.setAafAuthentication(contextUriPath); - } else if (userName != null && !userName.isEmpty() && password != null && !password.isEmpty()) { - service.setBasicAuthentication(userName, password, authUriPath); - } - - if (filterClasses != null && !filterClasses.isEmpty()) { - List<String> filterClassesList = Arrays.asList(filterClasses.split(SPACES_COMMA_SPACES)); - for (String filterClass : filterClassesList) { - service.addFilterClass(restUriPath, filterClass); - } - } - - if (restClasses != null && !restClasses.isEmpty()) { - List<String> restClassesList = Arrays.asList(restClasses.split(SPACES_COMMA_SPACES)); - for (String restClass : restClassesList) { - service.addServletClass(restUriPath, restClass); - } - } - - if (restPackages != null && !restPackages.isEmpty()) { - List<String> restPackageList = Arrays.asList(restPackages.split(SPACES_COMMA_SPACES)); - for (String restPackage : restPackageList) { - service.addServletPackage(restUriPath, restPackage); - } - } - - serviceList.add(service); - } - - return serviceList; - } - - @Override - public synchronized HttpServletServer get(int port) { - - if (servers.containsKey(port)) { - return servers.get(port); - } - - throw new IllegalArgumentException("Http Server for " + port + " not found"); - } - - @Override - public synchronized List<HttpServletServer> inventory() { - return new ArrayList<>(this.servers.values()); - } - - @Override - public synchronized void destroy(int port) { - - if (!servers.containsKey(port)) { - return; - } - - HttpServletServer server = servers.remove(port); - server.shutdown(); - } - - @Override - public synchronized void destroy() { - List<HttpServletServer> httpServletServers = this.inventory(); - for (HttpServletServer server : httpServletServers) { - server.shutdown(); - } - - synchronized (this) { - this.servers.clear(); - } - } - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java new file mode 100644 index 00000000..ad8ef99c --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/http/server/IndexedHttpServletServerFactory.java @@ -0,0 +1,236 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.http.server; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import org.onap.policy.common.endpoints.http.server.internal.JettyJerseyServer; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Indexed factory implementation. + */ +class IndexedHttpServletServerFactory implements HttpServletServerFactory { + + private static final String SPACES_COMMA_SPACES = "\\s*,\\s*"; + + /** + * logger. + */ + protected static Logger logger = LoggerFactory.getLogger(IndexedHttpServletServerFactory.class); + + /** + * servers index. + */ + protected HashMap<Integer, HttpServletServer> servers = new HashMap<>(); + + @Override + public synchronized HttpServletServer build(String name, boolean https, String host, int port, String contextPath, + boolean swagger, boolean managed) { + + if (servers.containsKey(port)) { + return servers.get(port); + } + + JettyJerseyServer server = new JettyJerseyServer(name, https, host, port, contextPath, swagger); + if (managed) { + servers.put(port, server); + } + + return server; + } + + @Override + public synchronized HttpServletServer build(String name, String host, int port, String contextPath, boolean swagger, + boolean managed) { + return build(name, false, host, port, contextPath, swagger, managed); + } + + @Override + public synchronized List<HttpServletServer> build(Properties properties) { + + ArrayList<HttpServletServer> serviceList = new ArrayList<>(); + + String serviceNames = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES); + if (serviceNames == null || serviceNames.isEmpty()) { + logger.warn("No topic for HTTP Service: {}", properties); + return serviceList; + } + + List<String> serviceNameList = Arrays.asList(serviceNames.split(SPACES_COMMA_SPACES)); + + for (String serviceName : serviceNameList) { + String servicePortString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX); + + int servicePort; + try { + if (servicePortString == null || servicePortString.isEmpty()) { + if (logger.isWarnEnabled()) { + logger.warn("No HTTP port for service in {}", serviceName); + } + continue; + } + servicePort = Integer.parseInt(servicePortString); + } catch (NumberFormatException nfe) { + if (logger.isWarnEnabled()) { + logger.warn("No HTTP port for service in {}", serviceName); + } + continue; + } + + final String hostName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX); + + final String contextUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX); + + final String userName = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX); + + final String password = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX); + + final String authUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_AUTH_URIPATH_SUFFIX); + + final String restClasses = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX); + + final String filterClasses = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_FILTER_CLASSES_SUFFIX); + + final String restPackages = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX); + + final String restUriPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + + "." + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_REST_URIPATH_SUFFIX); + + final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + + "." + serviceName + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + String swaggerString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX); + boolean swagger = false; + if (swaggerString != null && !swaggerString.isEmpty()) { + swagger = Boolean.parseBoolean(swaggerString); + } + + String httpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + boolean https = false; + if (httpsString != null && !httpsString.isEmpty()) { + https = Boolean.parseBoolean(httpsString); + } + + String aafString = properties.getProperty(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + + serviceName + PolicyEndPointProperties.PROPERTY_AAF_SUFFIX); + boolean aaf = false; + if (aafString != null && !aafString.isEmpty()) { + aaf = Boolean.parseBoolean(aafString); + } + + HttpServletServer service = build(serviceName, https, hostName, servicePort, contextUriPath, swagger, + managed); + + /* authentication method either AAF or HTTP Basic Auth */ + + if (aaf) { + service.setAafAuthentication(contextUriPath); + } else if (userName != null && !userName.isEmpty() && password != null && !password.isEmpty()) { + service.setBasicAuthentication(userName, password, authUriPath); + } + + if (filterClasses != null && !filterClasses.isEmpty()) { + List<String> filterClassesList = Arrays.asList(filterClasses.split(SPACES_COMMA_SPACES)); + for (String filterClass : filterClassesList) { + service.addFilterClass(restUriPath, filterClass); + } + } + + if (restClasses != null && !restClasses.isEmpty()) { + List<String> restClassesList = Arrays.asList(restClasses.split(SPACES_COMMA_SPACES)); + for (String restClass : restClassesList) { + service.addServletClass(restUriPath, restClass); + } + } + + if (restPackages != null && !restPackages.isEmpty()) { + List<String> restPackageList = Arrays.asList(restPackages.split(SPACES_COMMA_SPACES)); + for (String restPackage : restPackageList) { + service.addServletPackage(restUriPath, restPackage); + } + } + + serviceList.add(service); + } + + return serviceList; + } + + @Override + public synchronized HttpServletServer get(int port) { + + if (servers.containsKey(port)) { + return servers.get(port); + } + + throw new IllegalArgumentException("Http Server for " + port + " not found"); + } + + @Override + public synchronized List<HttpServletServer> inventory() { + return new ArrayList<>(this.servers.values()); + } + + @Override + public synchronized void destroy(int port) { + + if (!servers.containsKey(port)) { + return; + } + + HttpServletServer server = servers.remove(port); + server.shutdown(); + } + + @Override + public synchronized void destroy() { + List<HttpServletServer> httpServletServers = this.inventory(); + for (HttpServletServer server : httpServletServers) { + server.shutdown(); + } + + synchronized (this) { + this.servers.clear(); + } + } + +} |