diff options
author | Ryan Young <ry303t@att.com> | 2018-04-06 17:32:41 -0400 |
---|---|---|
committer | Patrick Brady <pb071s@att.com> | 2018-04-09 17:52:45 +0000 |
commit | b96311a375b16d1c237f8e99b8eca6024638262b (patch) | |
tree | 394a32a1106cb7790f10e7ee4aca12b2857e26e1 /appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java | |
parent | 674e3b93deddb432513f5860ebf595367f2ea292 (diff) |
Enhance DMaaP Adapter Configuration
Change-Id: I5385cf2710fc33a85da9a67d5f4d31dce1e460aa
Signed-off-by: Ryan Young <ry303t@att.com>
Issue-ID: APPC-658
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java')
3 files changed, 165 insertions, 90 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java index 6f907ae20..40ee1c71f 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP : APPC * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Copyright (C) 2017 Amdocs * ============================================================================= @@ -18,7 +18,6 @@ * See the License for the specific language governing permissions and * limitations under the License. * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. * ============LICENSE_END========================================================= */ @@ -50,27 +49,26 @@ import org.osgi.framework.ServiceReference; public class DmaapConsumerImpl implements Consumer { - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class); - private final Configuration configuration = ConfigurationFactory.getConfiguration(); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class); + private final Configuration configuration = ConfigurationFactory.getConfiguration(); // Default values - private static final int DEFAULT_TIMEOUT_MS = 60000; - private static final int DEFAULT_LIMIT = 1000; - private String topic; - private boolean isMetricEnabled = false; - private boolean useHttps = false; - private MetricRegistry metricRegistry; - private MRConsumer client = null; - private Properties props = null; - + private static final int DEFAULT_TIMEOUT_MS = 60000; + private static final int DEFAULT_LIMIT = 1000; + private String topic; + private boolean isMetricEnabled = false; + private boolean useHttps = false; + private MetricRegistry metricRegistry; + private MRConsumer client = null; + private Properties props = null; public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId, - String user, String password) { + String user, String password) { - this(urls, topicName, consumerGroupName, consumerId,user, password,null); + this(urls, topicName, consumerGroupName, consumerId, user, password, null); } public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId, - String user, String password, String filter) { + String user, String password, String filter) { this.topic = topicName; this.props = new Properties(); @@ -78,8 +76,13 @@ public class DmaapConsumerImpl implements Consumer { props.setProperty("host", urlsStr); props.setProperty("group", consumerGroupName); props.setProperty("id", consumerId); - props.setProperty("username", user); - props.setProperty("password", password); + if (user != null && password != null) { + props.setProperty("username", user); + props.setProperty("password", password); + } else { + props.setProperty("TransportType", "HTTPNOAUTH"); + } + if (filter != null) { props.setProperty("filter", filter); } @@ -92,22 +95,17 @@ public class DmaapConsumerImpl implements Consumer { metricRegistry = metricService.createRegistry("APPC"); DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory() - .dmaapRequestCounterBuilder() - .withName("DMAAP_KPI").withType(MetricType.COUNTER) - .withRecievedMessage(0) - .withPublishedMessage(0) - .build(); + .dmaapRequestCounterBuilder().withName("DMAAP_KPI").withType(MetricType.COUNTER) + .withRecievedMessage(0).withPublishedMessage(0).build(); if (metricRegistry.register(dmaapKpiMetric)) { - Metric[] metrics = new Metric[]{dmaapKpiMetric}; + Metric[] metrics = new Metric[] { dmaapKpiMetric }; LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); LogPublisher[] logPublishers = new LogPublisher[1]; logPublishers[0] = logPublisher; PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory() - .scheduledPolicyBuilder().withPublishers(logPublishers) - .withMetrics(metrics) - .build(); + .scheduledPolicyBuilder().withPublishers(logPublishers).withMetrics(metrics).build(); LOG.debug("Policy getting initialized"); manuallyScheduledPublishingPolicy.init(); @@ -121,12 +119,12 @@ public class DmaapConsumerImpl implements Consumer { */ private synchronized MRConsumer getClient(int waitMs, int limit) { try { - props.setProperty("timeout",String.valueOf(waitMs)); - props.setProperty("limit",String.valueOf(limit)); - String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic,props); + props.setProperty("timeout", String.valueOf(waitMs)); + props.setProperty("limit", String.valueOf(limit)); + String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic, props); return MRClientFactory.createConsumer(topicProducerPropFileName); } catch (IOException e1) { - LOG.error("failed to createConsumer",e1); + LOG.error("failed to createConsumer", e1); return null; } } @@ -134,8 +132,8 @@ public class DmaapConsumerImpl implements Consumer { @Override public synchronized void updateCredentials(String key, String secret) { LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); - props.setProperty("user",String.valueOf(key)); - props.setProperty("password",String.valueOf(secret)); + props.setProperty("username", String.valueOf(key)); + props.setProperty("password", String.valueOf(secret)); client = null; } @@ -227,4 +225,11 @@ public class DmaapConsumerImpl implements Consumer { } } + public Properties getProperties() { + return props; + } + + public boolean isHttps() { + return useHttps; + } } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java index 3fbfc95c6..3a39a98db 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java @@ -53,21 +53,20 @@ import org.osgi.framework.ServiceReference; public class DmaapProducerImpl implements Producer { - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class); - private static final Configuration configuration = ConfigurationFactory.getConfiguration(); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class); + private static final Configuration configuration = ConfigurationFactory.getConfiguration(); - private Set<String> topics; + private Set<String> topics; - private Properties props = null; - private MetricRegistry metricRegistry; - private boolean useHttps = false; - private boolean isMetricEnabled = false; - - private Set<MRBatchingPublisher> clients; + private Properties props = null; + private MetricRegistry metricRegistry; + private boolean useHttps = false; + private boolean isMetricEnabled = false; + + private Set<MRBatchingPublisher> clients; - public DmaapProducerImpl(Collection<String> urls, String topicName, String user, String password) { - this(urls, (Set<String>)null, user, password); + this(urls, (Set<String>) null, user, password); this.topics = new HashSet<>(); if (topicName != null) { Collections.addAll(topics, topicName.split(",")); @@ -76,15 +75,19 @@ public class DmaapProducerImpl implements Producer { public DmaapProducerImpl(Collection<String> urls, Set<String> topicNames, String user, String password) { topics = topicNames; - if (urls == null || user == null || password == null) { - throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password"); + if (urls == null) { + throw new IllegalArgumentException("Mandaory argument is null: urls"); } this.props = new Properties(); String urlsStr = StringUtils.join(urls, ','); - props.setProperty("host",urlsStr); + props.setProperty("host", urlsStr); props.setProperty("id", UUID.randomUUID().toString()); - props.setProperty("username",user); - props.setProperty("password",password); + if (user != null && password != null) { + props.setProperty("username", user); + props.setProperty("password", password); + } else { + props.setProperty("TransportType", "HTTPNOAUTH"); + } } private void initMetric() { @@ -94,23 +97,17 @@ public class DmaapProducerImpl implements Producer { metricRegistry = metricService.createRegistry("APPC"); DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory() - .dmaapRequestCounterBuilder() - .withName("DMAAP_KPI").withType(MetricType.COUNTER) - .withRecievedMessage(0) - .withPublishedMessage(0) - .build(); + .dmaapRequestCounterBuilder().withName("DMAAP_KPI").withType(MetricType.COUNTER) + .withRecievedMessage(0).withPublishedMessage(0).build(); if (metricRegistry.register(dmaapKpiMetric)) { - Metric[] metrics = new Metric[]{dmaapKpiMetric}; + Metric[] metrics = new Metric[] { dmaapKpiMetric }; LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); LogPublisher[] logPublishers = new LogPublisher[1]; logPublishers[0] = logPublisher; PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory() - .scheduledPolicyBuilder() - .withPublishers(logPublishers) - .withMetrics(metrics) - .build(); + .scheduledPolicyBuilder().withPublishers(logPublishers).withMetrics(metrics).build(); LOG.debug("Policy getting initialized"); manuallyScheduledPublishingPolicy.init(); @@ -123,7 +120,7 @@ public class DmaapProducerImpl implements Producer { Set<MRBatchingPublisher> out = new HashSet<>(); for (String topic : topics) { try { - String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props); + String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic, props); final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher(topicProducerPropFileName); out.add(client); } catch (Exception e) { @@ -136,7 +133,7 @@ public class DmaapProducerImpl implements Producer { @Override public synchronized void updateCredentials(String key, String secret) { LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); - props.setProperty("user", String.valueOf(key)); + props.setProperty("username", String.valueOf(key)); props.setProperty("password", String.valueOf(secret)); clients = null; } @@ -145,20 +142,20 @@ public class DmaapProducerImpl implements Producer { public boolean post(String partition, String data) { boolean success = true; Properties properties = configuration.getProperties(); - if (properties != null && properties.getProperty("metric.enabled") != null ) { + if (properties != null && properties.getProperty("metric.enabled") != null) { isMetricEnabled = Boolean.valueOf(properties.getProperty("metric.enabled")); } if (isMetricEnabled) { initMetric(); } - - // Create clients once and reuse them on subsequent posts. This is + + // Create clients once and reuse them on subsequent posts. This is // to support failover to other servers in the Dmaap cluster. if ((clients == null) || (clients.isEmpty())) { LOG.info("Getting CambriaBatchingPublisher Clients ..."); clients = getClients(); } - + for (MRBatchingPublisher client : clients) { try { LOG.debug(String.format("Posting %s to %s", data, client)); @@ -190,7 +187,7 @@ public class DmaapProducerImpl implements Producer { for (MRBatchingPublisher client : clients) { try { client.close(1, TimeUnit.SECONDS); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException e) { LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client), e); } } @@ -214,4 +211,11 @@ public class DmaapProducerImpl implements Producer { } } + public Properties getProperties() { + return props; + } + + public boolean isHttps() { + return useHttps; + } } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapUtil.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapUtil.java index 7a65311df..72e0a265b 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapUtil.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapUtil.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP : APPC * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Copyright (C) 2017 Amdocs * ============================================================================= @@ -18,72 +18,138 @@ * See the License for the specific language governing permissions and * limitations under the License. * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. * ============LICENSE_END========================================================= */ package org.onap.appc.adapter.messaging.dmaap.impl; import java.io.File; +import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.Properties; +import org.onap.appc.configuration.Configuration; +import org.onap.appc.configuration.ConfigurationFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class DmaapUtil { - private static final char DELIMITER = '_'; + private static final char DELIMITER = '_'; + + static final String DMAAP_PROPERTIES_PATH = "org.onap.appc.dmaap.profile.path"; + + private static final Logger log = LoggerFactory.getLogger(DmaapUtil.class); private DmaapUtil() { } private static String createPreferredRouteFileIfNotExist(String topic) throws IOException { String topicPreferredRouteFileName; - topicPreferredRouteFileName = topic+"preferredRoute.properties"; - File fo= new File(topicPreferredRouteFileName); - if(!fo.exists()) { + topicPreferredRouteFileName = topic + "preferredRoute.properties"; + File fo = new File(topicPreferredRouteFileName); + if (!fo.exists()) { ClassLoader classLoader = DmaapUtil.class.getClassLoader(); InputStream inputStream = classLoader.getResourceAsStream("preferredRoute.txt"); Properties props = new Properties(); props.load(inputStream); - String fileName = topic != null ? topic+ DELIMITER +"MR1" : DELIMITER +"MR1"; + String fileName = topic != null ? topic + DELIMITER + "MR1" : DELIMITER + "MR1"; props.setProperty("preferredRouteKey", fileName); topicPreferredRouteFileName = topic + "preferredRoute.properties"; - props.store(new FileOutputStream(topicPreferredRouteFileName), "preferredRoute.properties file created on the fly for topic:" + topic + " on:" + System.currentTimeMillis()); + props.store(new FileOutputStream(topicPreferredRouteFileName), + "preferredRoute.properties file created on the fly for topic:" + topic + " on:" + + System.currentTimeMillis()); } return topicPreferredRouteFileName; } - public static String createConsumerPropFile(String topic, Properties props)throws IOException { + public static String createConsumerPropFile(String topic, Properties props) throws IOException { String defaultProfFileName = "consumer.properties"; - return createConsumerProducerPropFile(topic, defaultProfFileName,props); + + log.debug("Creating DMaaP Consumer Property File for topic " + topic); + return createConsumerProducerPropFile(topic, defaultProfFileName, props); } - public static String createProducerPropFile(String topic, Properties props)throws IOException { + public static String createProducerPropFile(String topic, Properties props) throws IOException { String defaultProfFileName = "producer.properties"; - return createConsumerProducerPropFile(topic, defaultProfFileName,props); + + log.debug("Creating DMaaP Producer Property File for topic " + topic); + return createConsumerProducerPropFile(topic, defaultProfFileName, props); } - private static String createConsumerProducerPropFile(String topic, String defaultProfFileName, Properties props) throws IOException { - ClassLoader classLoader = DmaapUtil.class.getClassLoader(); - InputStream inputStream = classLoader.getResourceAsStream(defaultProfFileName); - Properties defaultProps = new Properties(); - defaultProps.load(inputStream); - defaultProps.setProperty("topic",topic); + private static String createConsumerProducerPropFile(String topic, String defaultProfFileName, Properties props) + throws IOException { + Properties defaultProps = getDefaultProperties(defaultProfFileName); + + defaultProps.setProperty("topic", topic); String preferredRouteFileName = DmaapUtil.createPreferredRouteFileIfNotExist(topic); - if(props != null && !props.isEmpty()){ + if (props != null && !props.isEmpty()) { defaultProps.putAll(props); } - defaultProps.setProperty("topic",topic); - defaultProps.setProperty("DME2preferredRouterFilePath",preferredRouteFileName); + defaultProps.setProperty("topic", topic); + defaultProps.setProperty("DME2preferredRouterFilePath", preferredRouteFileName); String id = defaultProps.getProperty("id"); String topicConsumerPropFileName = defaultProfFileName; - topicConsumerPropFileName = id != null ? id+ DELIMITER +topicConsumerPropFileName : DELIMITER +topicConsumerPropFileName; - topicConsumerPropFileName = topic != null ? topic+ DELIMITER +topicConsumerPropFileName : DELIMITER +topicConsumerPropFileName; + topicConsumerPropFileName = id != null ? id + DELIMITER + topicConsumerPropFileName + : DELIMITER + topicConsumerPropFileName; + topicConsumerPropFileName = topic != null ? topic + DELIMITER + topicConsumerPropFileName + : DELIMITER + topicConsumerPropFileName; - defaultProps.store(new FileOutputStream(topicConsumerPropFileName), defaultProfFileName+" file created on the fly for topic:"+topic+" on:"+System.currentTimeMillis()); + defaultProps.store(new FileOutputStream(topicConsumerPropFileName), defaultProfFileName + + " file created on the fly for topic:" + topic + " on:" + System.currentTimeMillis()); return topicConsumerPropFileName; } + private static Properties getDefaultProperties(String profileName) { + Properties props = new Properties(); + + // use appc configuration to get all properties which includes + // appc.properties and system properties + // allowing variable to be set in any location + Configuration config = ConfigurationFactory.getConfiguration(); + String dmaapPropPath = config.getProperty(DMAAP_PROPERTIES_PATH); + + if (dmaapPropPath != null) { + // load from file system + + File profileFile = new File(dmaapPropPath, profileName); + FileInputStream inputStream = null; + + log.info("Loading DMaaP Profile from " + profileFile.getAbsolutePath()); + + if (profileFile.exists()) { + try { + inputStream = new FileInputStream(profileFile); + props.load(inputStream); + } catch (IOException e) { + log.error("Exception loading DMaaP Profile from " + profileFile.getAbsolutePath(), e); + } finally { + try { + if (inputStream != null) { + inputStream.close(); + } + } catch (IOException ex) { + log.warn("Exception closing DMaaP Profile file " + profileFile.getAbsolutePath(), ex); + } + } + } + } + if (props.isEmpty()) { + // load default Profile from class + log.info("Loading Default DMaaP Profile"); + + ClassLoader classLoader = DmaapUtil.class.getClassLoader(); + InputStream inputStream = classLoader.getResourceAsStream(profileName); + try { + props.load(inputStream); + } catch (IOException e) { + log.error("Exception loading Default DMaaP Profile", e); + } + } + + return props; + } } |