diff options
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter')
42 files changed, 2105 insertions, 97 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/pom.xml b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/pom.xml index 44ca150d1..449ce92d2 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/pom.xml +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/pom.xml @@ -14,13 +14,17 @@ <dependencies> <dependency> <groupId>org.openecomp.appc</groupId> + <artifactId>appc-message-adapter-api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.openecomp.appc</groupId> <artifactId>appc-metric-bundle</artifactId> - <version>1.1.0-SNAPSHOT</version> + <version>${project.version}</version> </dependency> <dependency> <groupId>org.openecomp.appc</groupId> <artifactId>appc-common</artifactId> - <classifier>jar-with-dependencies</classifier> <version>${project.version}</version> </dependency> @@ -65,7 +69,6 @@ <dependency> <groupId>org.powermock</groupId> <artifactId>powermock-api-mockito</artifactId> - <version>1.6.2</version> <scope>test</scope> </dependency> <dependency> @@ -85,10 +88,17 @@ <dependency> <groupId>org.powermock</groupId> <artifactId>powermock-module-junit4</artifactId> - <version>1.6.2</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + </dependency> <dependency> <groupId>org.openecomp.sdnc.core</groupId> <artifactId>sli-common</artifactId> @@ -119,6 +129,14 @@ </exclusions> </dependency> + <!-- DMaaP Client --> + <dependency> + <groupId>com.att.nsa</groupId> + <artifactId>dmaapClient</artifactId> + <version>0.2.12</version> +<!-- <version>${dmaap.client.version}</version> --> + </dependency> + <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> @@ -129,8 +147,6 @@ </dependency> </dependencies> - - <build> <plugins> <plugin> @@ -139,12 +155,13 @@ <extensions>true</extensions> <configuration> <instructions> - <Bundle-SymbolicName>org.openecomp.appc.adapter.dmaap</Bundle-SymbolicName> - <Bundle-Activator>org.openecomp.appc.adapter.dmaap.AppcDmaapAdapterActivator</Bundle-Activator> - <Export-Package>org.openecomp.appc.adapter.dmaap.*</Export-Package> - <Export-Serice>org.openecomp.appc.adapter.dmaap.EventSender</Export-Serice> - <Import-Package>org.openecomp.appc.metricservice.*,org.openecomp.sdnc.sli.*,org.osgi.framework.*,!org.osgi.service.event.*,org.osgi.service.*,org.osgi.util.*,org.slf4j.*,com.vmware.*,org.apache.xerces.*,javax.net.ssl.*,org.xml.sax.*,javax.xml.*,javax.naming.*,javax.crypto.*, com.sun.jersey.spi.container.servlet,org.eclipse.jetty.servlets</Import-Package> - <Embed-Dependency>*;scope=compile|runtime;artifactId=!appc-metric-bundle|sli-common|org.eclipse.osgi|slf4j-api|jcl-over-slf4j|mysql-connector-java|xml-apis|pax-*</Embed-Dependency> + <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName> + <Bundle-Version>${project.version}</Bundle-Version> + <!--<Bundle-SymbolicName>org.openecomp.appc.adapter.messaging.dmaap</Bundle-SymbolicName>--> + <Bundle-Activator>org.openecomp.appc.adapter.messaging.dmaap.AppcDmaapAdapterActivator</Bundle-Activator> + <Export-Package>org.openecomp.appc.adapter.messaging.*</Export-Package> + <Import-Package>!org.slf4j.event,org.openecomp.appc.adapter.message.*,org.openecomp.appc.metricservice.*,com.att.nsa.*,org.openecomp.sdnc.sli.*,org.osgi.framework.*,!org.osgi.service.event.*,org.osgi.service.*,org.osgi.util.*,org.slf4j.*,com.vmware.*,org.apache.xerces.*,javax.net.ssl.*,org.xml.sax.*,javax.xml.*,javax.naming.*,javax.crypto.*, com.sun.jersey.spi.container.servlet,org.eclipse.jetty.servlets</Import-Package> + <Embed-Dependency>*;scope=compile|runtime;artifactId=!appc-metric-bundle|appc-message-adapter-api|sli-common|org.eclipse.osgi|slf4j-api|jcl-over-slf4j|mysql-connector-java|xml-apis|pax-*</Embed-Dependency> <Embed-Transitive>true</Embed-Transitive> <Bundle-Blueprint>OSGI-INF/blueprint/blueprint.xml</Bundle-Blueprint> </instructions> diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/AppcDmaapAdapterActivator.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java index c02553dfe..c7be330cf 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/AppcDmaapAdapterActivator.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java @@ -19,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.messaging.dmaap; import org.openecomp.appc.configuration.ConfigurationFactory; import com.att.eelf.configuration.EELFLogger; diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CommonHttpClient.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java index 654ec6f7f..0d0450681 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CommonHttpClient.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java @@ -19,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.messaging.dmaap.http; import java.net.URI; diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapConsumer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java index 6e16d896b..2145eaa70 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapConsumer.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java @@ -19,14 +19,13 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.messaging.dmaap.http; import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.List; - import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; @@ -38,10 +37,11 @@ import org.apache.http.client.utils.URIBuilder; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.json.JSONArray; +import org.openecomp.appc.adapter.message.Consumer; -public class DmaapConsumer extends CommonHttpClient implements Consumer { +public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer { - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumer.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class); // Default values private static final int DEFAULT_TIMEOUT_MS = 15000; @@ -54,17 +54,17 @@ public class DmaapConsumer extends CommonHttpClient implements Consumer { private boolean useHttps = false; - public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId) { + public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId) { this(hosts, topicName, consumerName, consumerId, null); } - public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId, - String filter) { + public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId, + String filter) { this(hosts, topicName, consumerName, consumerId, filter, null, null); } - public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId, - String filter, String user, String password) { + public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId, + String filter, String user, String password) { urls = new ArrayList<String>(); for (String host : hosts) { urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId)); @@ -155,9 +155,10 @@ public class DmaapConsumer extends CommonHttpClient implements Consumer { LOG.error("Interrupted while sleeping"); } } - - public void close(){ - //not used yet - } + + @Override + public void close() { + // Nothing to do + } } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapProducer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java index 6845177b1..85e446dca 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapProducer.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java @@ -19,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.messaging.dmaap.http; import java.net.URI; import java.util.ArrayList; @@ -34,10 +34,11 @@ import com.att.eelf.configuration.EELFManager; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; +import org.openecomp.appc.adapter.message.Producer; -public class DmaapProducer extends CommonHttpClient implements Producer { +public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer { - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducer.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class); private static final String CONTENT_TYPE = "application/cambria"; private static final String URL_TEMPLATE = "%s/events/%s"; @@ -47,7 +48,7 @@ public class DmaapProducer extends CommonHttpClient implements Producer { private boolean useHttps = false; - public DmaapProducer(Collection<String> urls, String topicName) { + public HttpDmaapProducerImpl(Collection<String> urls, String topicName) { hosts = new ArrayList<String>(); topics = new HashSet<String>(); topics.add(topicName); @@ -57,7 +58,7 @@ public class DmaapProducer extends CommonHttpClient implements Producer { } } - public DmaapProducer(Collection<String> urls, Set<String> topicNames) { + public HttpDmaapProducerImpl(Collection<String> urls, Set<String> topicNames) { hosts = new ArrayList<String>(); topics = topicNames; @@ -126,8 +127,9 @@ public class DmaapProducer extends CommonHttpClient implements Producer { String m = (msg == null) ? "" : msg; return String.format("%d.%d.%s%s", p.length(), m.length(), p, m); } - - public void close(){ - //not used yet - } + + @Override + public void close() { + // Nothing to do + } } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java new file mode 100644 index 000000000..342d52448 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java @@ -0,0 +1,231 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap.impl; + +import java.io.IOException; +import java.util.*; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +//import com.att.nsa.cambria.client.CambriaClientBuilders; +//import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; +//import com.att.nsa.cambria.client.CambriaConsumer; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; +import org.apache.commons.lang3.StringUtils; +import org.openecomp.appc.adapter.message.Consumer; +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; +import org.openecomp.appc.metricservice.MetricRegistry; +import org.openecomp.appc.metricservice.MetricService; +import org.openecomp.appc.metricservice.impl.MetricServiceImpl; +import org.openecomp.appc.metricservice.metric.Metric; +import org.openecomp.appc.metricservice.metric.MetricType; +import org.openecomp.appc.metricservice.metric.DmaapRequestCounterMetric; +import org.openecomp.appc.metricservice.policy.PublishingPolicy; +import org.openecomp.appc.metricservice.publisher.LogPublisher; +import org.osgi.framework.BundleContext; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.ServiceReference; + +public class DmaapConsumerImpl implements Consumer { + + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class); + private static final Configuration configuration = ConfigurationFactory.getConfiguration(); + // Default values + private static final int DEFAULT_TIMEOUT_MS = 60000; + private static final int DEFAULT_LIMIT = 1000; + private static MetricRegistry metricRegistry; + private String topic; + private DmaapRequestCounterMetric dmaapKpiMetric; + private boolean isMetricEnabled=false; + private boolean useHttps = false; + private MRConsumer client = null; + private Properties props = null; + + + public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password) { + this(urls, topicName, consumerGroupName, consumerId,user, password,null); + + } + + public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password,String filter) { + this.topic = topicName; + this.props = new Properties(); + String urlsStr = StringUtils.join(urls, ','); + props.setProperty("host",urlsStr); + props.setProperty("group",consumerGroupName); + props.setProperty("id",consumerId); + props.setProperty("username",user); + props.setProperty("password",password); + if(filter != null) { + props.setProperty("filter", filter); + } + } + + + private void initMetric() { + LOG.debug("Metric getting initialized"); + MetricService metricService = getMetricservice(); + metricRegistry = metricService.createRegistry("APPC"); + dmaapKpiMetric = metricRegistry.metricBuilderFactory(). + dmaapRequestCounterBuilder(). + withName("DMAAP_KPI").withType(MetricType.COUNTER). + withRecievedMessage(0) + .withPublishedMessage(0) + .build(); + if (metricRegistry.register(dmaapKpiMetric)) { + Metric[] metrics = new Metric[]{dmaapKpiMetric}; + LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); + LogPublisher[] logPublishers = new LogPublisher[1]; + logPublishers[0] = logPublisher; + PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory(). + scheduledPolicyBuilder().withPublishers(logPublishers). + withMetrics(metrics). + build(); + LOG.debug("Policy getting initialized"); + manuallyScheduledPublishingPolicy.init(); + LOG.debug("Metric initialized"); + } + } + private MRConsumer getClient() { + return getClient(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); + } + + /** + * @return An instance of MRConsumer created from our class variables + */ + private synchronized MRConsumer getClient(int waitMs, int limit) { + try { + props.setProperty("timeout",String.valueOf(waitMs)); + props.setProperty("limit",String.valueOf(limit)); + String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic,props); + return MRClientFactory.createConsumer ( topicProducerPropFileName); + } catch (IOException e1) { + LOG.error("failed to createConsumer",e1); + return null; + } + } + + @Override + public synchronized void updateCredentials(String key, String secret) { + LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); + String user = key; + String password = secret; + props.setProperty("user",String.valueOf(user)); + props.setProperty("password",String.valueOf(password)); + client = null; + } + + @Override + public List<String> fetch(int waitMs, int limit) { + Properties properties=configuration.getProperties(); + if(properties!=null && properties.getProperty("metric.enabled")!=null ){ + isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled")); + } + if(isMetricEnabled){ + initMetric(); + } + LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); + List<String> out = new ArrayList<String>(); + + // Create client once and reuse it on subsequent fetches. This is + // to support failover to other servers in the DMaaP cluster. + if (client == null) { + LOG.info("Getting DMaaP Client ..."); + client = getClient(waitMs, limit); + } + try { + for (String s : client.fetch(waitMs, limit)) { + out.add(s); + if(isMetricEnabled){ + ((DmaapRequestCounterMetric)metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage(); + } + } + LOG.debug(String.format("Got %d records from %s", out.size(), this.toString())); + } catch (Exception e) { + // Connection exception + LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage())); + e.printStackTrace(); + try { + LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs)); + Thread.sleep(waitMs); + } catch (InterruptedException e2) { + LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs)); + } + } + + + return out; + } + + /** + * Close consumer Dmaap client + */ + @Override + public void close() { + LOG.debug("Closing Dmaap consumer client...."); + if (client != null) { + client.close(); + } + } + + @Override + public List<String> fetch() { + return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); + } + + @Override + public String toString() { + String hostStr = (props == null || props.getProperty("host") == null? "N/A" : props.getProperty("host")); + String group = (props == null || props.getProperty("group") == null? "N/A" : props.getProperty("group")); + String id = (props == null || props.getProperty("id") == null? "N/A" : props.getProperty("id")); + return String.format("Consumer %s/%s listening to %s on [%s]", group, id, topic, hostStr); + } + + @Override + public void useHttps(boolean yes) { + useHttps = yes; + } + + + private MetricService getMetricservice() { + BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext(); + // Get AAIadapter reference + ServiceReference sref = bctx.getServiceReference(MetricService.class.getName()); + if (sref != null) { + LOG.info("Metric Service from bundlecontext"); + return (MetricServiceImpl) bctx.getService(sref); + + } else { + LOG.info("Metric Service error from bundlecontext"); + LOG.warn("Cannot find service reference for org.openecomp.appc.metricservice.MetricService"); + return null; + + } + } + + public Metric getMetric(String name){ + return metricRegistry.metric(name); + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java new file mode 100644 index 000000000..79d6b3db7 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java @@ -0,0 +1,220 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap.impl; + +import java.io.*; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +//import com.att.nsa.cambria.client.CambriaBatchingPublisher; +//import com.att.nsa.cambria.client.CambriaClientBuilders; +//import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import org.apache.commons.lang3.StringUtils; +import org.openecomp.appc.adapter.message.Producer; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapUtil; +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; +import org.openecomp.appc.metricservice.MetricRegistry; +import org.openecomp.appc.metricservice.MetricService; +import org.openecomp.appc.metricservice.metric.Metric; +import org.openecomp.appc.metricservice.metric.MetricType; +import org.openecomp.appc.metricservice.metric.DmaapRequestCounterMetric; +import org.openecomp.appc.metricservice.policy.PublishingPolicy; +import org.openecomp.appc.metricservice.publisher.LogPublisher; +import org.osgi.framework.BundleContext; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.ServiceReference; + +public class DmaapProducerImpl implements Producer { + + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class); + private static final Configuration configuration = ConfigurationFactory.getConfiguration(); + + private Set<String> topics = new HashSet<String>(); + + private Properties props = null; + private static MetricRegistry metricRegistry; + private boolean useHttps = false; + private DmaapRequestCounterMetric dmaapKpiMetric; + private boolean isMetricEnabled=false; + + private Set<MRBatchingPublisher> clients; + + + public DmaapProducerImpl(Collection<String> urls, String topicName, String user, String password) { + this(urls, (Set<String>)null, user, password); + this.topics = new HashSet<>(); + if (topicName != null) { + for (String topic : topicName.split(",")) { + topics.add(topic); + } + } + } + + public DmaapProducerImpl(Collection<String> urls, Set<String> topicNames, String user, String password) { + topics = topicNames; + if(urls == null || user == null || password == null){ + throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password" ); + } + this.props = new Properties(); + String urlsStr = StringUtils.join(urls, ','); + props.setProperty("host",urlsStr); + props.setProperty("id", UUID.randomUUID().toString()); + props.setProperty("username",user); + props.setProperty("password",password); + } + private void initMetric() { + LOG.debug("Metric getting initialized"); + MetricService metricService = getMetricservice(); + metricRegistry=metricService.createRegistry("APPC"); + dmaapKpiMetric = metricRegistry.metricBuilderFactory(). + dmaapRequestCounterBuilder(). + withName("DMAAP_KPI").withType(MetricType.COUNTER). + withRecievedMessage(0) + .withPublishedMessage(0) + .build(); + if(metricRegistry.register(dmaapKpiMetric)) { + Metric[] metrics = new Metric[]{dmaapKpiMetric}; + LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); + LogPublisher[] logPublishers = new LogPublisher[1]; + logPublishers[0] = logPublisher; + PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory(). + scheduledPolicyBuilder().withPublishers(logPublishers). + withMetrics(metrics). + build(); + LOG.debug("Policy getting initialized"); + manuallyScheduledPublishingPolicy.init(); + LOG.debug("Metric initialized"); + } + + } + private Set<MRBatchingPublisher> getClients() { + Set<MRBatchingPublisher> out = new HashSet<MRBatchingPublisher>(); + for (String topic : topics) { + try { + String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props); + final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher (topicProducerPropFileName); + out.add(client); + } catch (Exception e) { + e.printStackTrace(); + } + } + + return out; + } + + @Override + public synchronized void updateCredentials(String key, String secret) { + LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); + String user = key; + String password = secret; + props.setProperty("user",String.valueOf(user)); + props.setProperty("password",String.valueOf(password)); + clients = null; + } + + @Override + public boolean post(String partition, String data) { + boolean success = true; + Properties properties=configuration.getProperties(); + if(properties!=null && properties.getProperty("metric.enabled")!=null ){ + isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled")); + } + if(isMetricEnabled){ + initMetric(); + } + + // Create clients once and reuse them on subsequent posts. This is + // to support failover to other servers in the Dmaap cluster. + if ((clients == null) || (clients.isEmpty())) { + LOG.info("Getting CambriaBatchingPublisher Clients ..."); + clients = getClients(); + } + + for (MRBatchingPublisher client : clients) { + try { + LOG.debug(String.format("Posting %s to %s", data, client)); + client.send(partition, data); + } catch (IOException e) { + e.printStackTrace(); + success = false; + } + } + if(isMetricEnabled){ + ( (DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage(); + } + return success; + } + + /** + * Close producer Dmaap client + */ + @Override + public void close() { + if ((clients == null) || (clients.isEmpty())) { + return; + } + + LOG.debug("Closing Dmaap producer clients...."); + for (MRBatchingPublisher client : clients) { + try { + client.close(1, TimeUnit.SECONDS); + } catch (IOException | InterruptedException e) { + LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client)); + e.printStackTrace(); + } + } + } + + @Override + public void useHttps(boolean yes) { + useHttps = yes; + } + + private MetricService getMetricservice() { +/* + return AppcDmaapAdapterActivator.getMetricService(); +*/ + + BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext(); + ServiceReference sref = bctx.getServiceReference(MetricService.class.getName()); + if (sref != null) { + LOG.info("Metric Service from bundlecontext"); + return (MetricService) bctx.getService(sref); + + } else { + LOG.info("Metric Service error from bundlecontext"); + LOG.warn("Cannot find service reference for org.openecomp.appc.metricservice.MetricService"); + return null; + + } + } + + public Metric getMetric(String name){ + return metricRegistry.metric(name); + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java new file mode 100644 index 000000000..39857b856 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java @@ -0,0 +1,83 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap.impl; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +public class DmaapUtil { + private final static String delimiter = "_"; + private static String createPreferredRouteFileIfNotExist(String topic) throws IOException { + String topicPreferredRouteFileName = null; + topicPreferredRouteFileName = topic+"preferredRoute.properties"; + File fo= new File(topicPreferredRouteFileName); + if(!fo.exists()) { + ClassLoader classLoader = DmaapUtil.class.getClassLoader(); + InputStream inputStream = classLoader.getResourceAsStream("preferredRoute.txt"); + Properties props = new Properties(); + props.load(inputStream); + String fileName = topic != null ? topic+delimiter+"MR1" : delimiter+"MR1"; + props.setProperty("preferredRouteKey", fileName); + topicPreferredRouteFileName = topic + "preferredRoute.properties"; + props.store(new FileOutputStream(topicPreferredRouteFileName), "preferredRoute.properties file created on the fly for topic:" + topic + " on:" + System.currentTimeMillis()); + } + return topicPreferredRouteFileName; + } + + public static String createConsumerPropFile(String topic, Properties props)throws IOException { + String defaultProfFileName = "consumer.properties"; + String topicConsumerPropFileName = createConsumerProducerPropFile(topic, defaultProfFileName,props); + return topicConsumerPropFileName; + } + + public static String createProducerPropFile(String topic, Properties props)throws IOException { + String defaultProfFileName = "producer.properties"; + String topicConsumerPropFileName = createConsumerProducerPropFile(topic, defaultProfFileName,props); + return topicConsumerPropFileName; + } + + private static String createConsumerProducerPropFile(String topic, String defaultProfFileName, Properties props) throws IOException { + ClassLoader classLoader = DmaapUtil.class.getClassLoader(); + InputStream inputStream = classLoader.getResourceAsStream(defaultProfFileName); + Properties defaultProps = new Properties(); + defaultProps.load(inputStream); + defaultProps.setProperty("topic",topic); + + String preferredRouteFileName = DmaapUtil.createPreferredRouteFileIfNotExist(topic); + if(props != null && !props.isEmpty()){ + defaultProps.putAll(props); + } + defaultProps.setProperty("topic",topic); + defaultProps.setProperty("DME2preferredRouterFilePath",preferredRouteFileName); + String id = defaultProps.getProperty("id"); + String topicConsumerPropFileName = defaultProfFileName; + topicConsumerPropFileName = id != null ? id+delimiter+topicConsumerPropFileName : delimiter+topicConsumerPropFileName; + topicConsumerPropFileName = topic != null ? topic+delimiter+topicConsumerPropFileName : delimiter+topicConsumerPropFileName; + + defaultProps.store(new FileOutputStream(topicConsumerPropFileName), defaultProfFileName+" file created on the fly for topic:"+topic+" on:"+System.currentTimeMillis()); + return topicConsumerPropFileName; + } + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/impl/EventSenderImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java index 0f7d40f5b..c671c6fdb 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/impl/EventSenderImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java @@ -19,33 +19,33 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap.impl; +package org.openecomp.appc.adapter.messaging.dmaap.impl; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.openecomp.sdnc.sli.SvcLogicContext; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import org.openecomp.appc.adapter.dmaap.EventSender; -import org.openecomp.appc.adapter.dmaap.Producer; -import org.openecomp.appc.adapter.dmaap.DmaapDestination; -import org.openecomp.appc.adapter.dmaap.event.EventHeader; -import org.openecomp.appc.adapter.dmaap.event.EventMessage; -import org.openecomp.appc.adapter.dmaap.event.EventStatus; -import org.openecomp.appc.adapter.dmaap.DmaapProducer; +import org.openecomp.appc.adapter.message.EventSender; +import org.openecomp.appc.adapter.message.MessageDestination; +import org.openecomp.appc.adapter.message.Producer; +import org.openecomp.appc.adapter.message.event.EventHeader; +import org.openecomp.appc.adapter.message.event.EventMessage; +import org.openecomp.appc.adapter.message.event.EventStatus; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl; import org.openecomp.appc.configuration.Configuration; import org.openecomp.appc.configuration.ConfigurationFactory; import org.openecomp.appc.exceptions.APPCException; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import org.openecomp.sdnc.sli.SvcLogicContext; - -public class EventSenderImpl implements EventSender +public class EventSenderDmaapImpl implements EventSender { - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderImpl.class); - public static final String EVENT_TOPIC_WRITE = "event.topic.write"; - public static final String EVENT_CLIENT_KEY = "event.client.key"; - public static final String EVENT_CLIENT_SECRET = "event.client.secret"; - public static final String EVENT_POOL_MEMBERS = "event.pool.members"; + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderDmaapImpl.class); + public static final String EVENT_TOPIC_WRITE = "dmaap.event.topic.write"; + public static final String DMAAP_USERNAME = "dmaap.appc.username"; + public static final String DMAAP_PASSWORD = "dmaap.appc.password"; + public static final String EVENT_POOL_MEMBERS = "dmaap.event.pool.members"; private static Configuration configuration = ConfigurationFactory.getConfiguration(); @@ -59,21 +59,21 @@ public class EventSenderImpl implements EventSender this.producerMap = producerMap; } - public EventSenderImpl(){ + public EventSenderDmaapImpl(){ } public void initialize(){ Properties properties = configuration.getProperties(); String writeTopic; - String apiKey; - String apiSecret; + String username; + String password; final List<String> pool = new ArrayList<>(); - for(DmaapDestination destination:DmaapDestination.values()){ + for(MessageDestination destination: MessageDestination.values()){ writeTopic = properties.getProperty(destination + "." + EVENT_TOPIC_WRITE); - apiKey = properties.getProperty(destination + "." + EVENT_CLIENT_KEY); - apiSecret = properties.getProperty(destination + "." + EVENT_CLIENT_SECRET); + username = properties.getProperty(destination + "." + DMAAP_USERNAME); + password = properties.getProperty(destination + "." + DMAAP_PASSWORD); String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS); if (hostNames != null && !hostNames.isEmpty()) { @@ -83,12 +83,8 @@ public class EventSenderImpl implements EventSender LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS)); LOG.debug(String.format("writeTopic = %s, taken from property: %s", writeTopic, destination + "." + EVENT_TOPIC_WRITE)); - LOG.debug(String.format("apiKey = %s, taken from property: %s", apiKey, destination + "." + EVENT_CLIENT_KEY)); - Producer producer = new DmaapProducer(pool, writeTopic); - - if (apiKey != null && apiSecret != null) { - producer.updateCredentials(apiKey, apiSecret); - } + LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME)); + Producer producer = new DmaapProducerImpl(pool, writeTopic,username, password); for (String url : pool) { if (url.contains("3905") || url.contains("https")) { @@ -103,7 +99,7 @@ public class EventSenderImpl implements EventSender } @Override - public boolean sendEvent(DmaapDestination destination,EventMessage msg) { + public boolean sendEvent(MessageDestination destination, EventMessage msg) { String jsonStr = msg.toJson(); String id = msg.getEventHeader().getEventId(); LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr)); @@ -112,7 +108,43 @@ public class EventSenderImpl implements EventSender } @Override - public boolean sendEvent(DmaapDestination destination,Map<String, String> params, SvcLogicContext ctx) throws APPCException { + public boolean sendEvent(MessageDestination destination, EventMessage msg, String eventTopicName) { + String jsonStr = msg.toJson(); + String id = msg.getEventHeader().getEventId(); + LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr)); + Producer producer = createProducer(destination, eventTopicName); + return producer.post(id, jsonStr); + } + + private Producer createProducer(MessageDestination destination, String eventTopicName) { + Properties properties = configuration.getProperties(); + final List<String> pool = new ArrayList<>(); + String username = properties.getProperty(destination + "." + DMAAP_USERNAME); + String password = properties.getProperty(destination + "." + DMAAP_PASSWORD); + String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS); + + if (hostNames != null && !hostNames.isEmpty()) { + LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS)); + Collections.addAll(pool, hostNames.split(",")); + } + + LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS)); + LOG.debug(String.format("writeTopic = %s, taken from property: %s", eventTopicName, destination + "." + EVENT_TOPIC_WRITE)); + LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME)); + Producer producer = new DmaapProducerImpl(pool, eventTopicName,username, password); + + for (String url : pool) { + if (url.contains("3905") || url.contains("https")) { + LOG.debug("Producer should use HTTPS"); + producer.useHttps(true); + break; + } + } + return producer; + } + + @Override + public boolean sendEvent(MessageDestination destination, Map<String, String> params, SvcLogicContext ctx) throws APPCException { if (params == null) { String message = "Parameters map is empty (null)"; @@ -134,10 +166,10 @@ public class EventSenderImpl implements EventSender LOG.error(message); throw new APPCException(message); } - EventMessage dmaapEventMessage = new EventMessage( + EventMessage eventMessage = new EventMessage( new EventHeader(eventTime, apiVer, eventId), new EventStatus(code, reason)); - return sendEvent(destination,dmaapEventMessage); + return sendEvent(destination,eventMessage); } } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/OSGI-INF/blueprint/blueprint.xml index a8caf666f..eefe8e504 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -24,10 +24,10 @@ <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd"> - <bean id="eventSenderBean" class="org.openecomp.appc.adapter.dmaap.impl.EventSenderImpl" + <bean id="eventSenderBean" class="org.openecomp.appc.adapter.messaging.dmaap.impl.EventSenderDmaapImpl" init-method="initialize" scope="singleton"> </bean> - <service id="eventSenderService" interface="org.openecomp.appc.adapter.dmaap.EventSender" ref="eventSenderBean" /> + <service id="eventSenderService" interface="org.openecomp.appc.adapter.message.EventSender" ref="eventSenderBean" /> </blueprint> diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties new file mode 100644 index 000000000..facd33e4d --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties @@ -0,0 +1,57 @@ +### +# ============LICENSE_START======================================================= +# openECOMP : APP-C +# ================================================================================ +# Copyright (C) 2017 AT&T Intellectual Property. All rights +# reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +#TransportType-Specify which way user want to use. I.e. <HTTPAAF,DME2,HTTPAUTH > +TransportType=HTTPAAF +Latitude =50.000000 +Longitude =-100.000000 +Version =1.0 +ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.openecomp.org/events +Environment =TEST +Partner=BOT_R +routeOffer=MR1 +SubContextPath =/ +Protocol =http +MethodType =GET +username =admin +password =admin +contenttype =application/json +authKey=01234567890abcde:01234567890abcdefghijklmn +authDate=2016-02-18T13:57:37-0800 +host=127.0.0.1 +topic=org.openecomp.appc.UNIT-TEST +group=jmsgrp +id=2 +timeout=15000 +limit=1000 +filter= +AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler +AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler +AFT_DME2_REQ_TRACE_ON=true +AFT_ENVIRONMENT=AFTUAT +AFT_DME2_EP_CONN_TIMEOUT=15000 +AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000 +AFT_DME2_EP_READ_TIMEOUT_MS=50000 +sessionstickinessrequired=NO +DME2preferredRouterFilePath=preferredRoute.txt + + + diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt new file mode 100644 index 000000000..662b0aa7d --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt @@ -0,0 +1 @@ +preferredRouteKey=MR1
\ No newline at end of file diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/producer.properties b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/producer.properties new file mode 100644 index 000000000..4901d517e --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/resources/producer.properties @@ -0,0 +1,52 @@ +### +# ============LICENSE_START======================================================= +# openECOMP : APP-C +# ================================================================================ +# Copyright (C) 2017 AT&T Intellectual Property. All rights +# reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +#TransportType-Specify which way user want to use. I.e. <HTTPAAF,DME2,HTTPAUTH > +TransportType=HTTPAAF +Latitude =50.000000 +Longitude =-100.000000 +Version =1.0 +ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.openecomp.org/events +Environment =TEST +Partner=BOT_R +SubContextPath =/ +Protocol =http +MethodType =POST +username =admin +password =admin +contenttype = application/json +authKey=01234567890abcde:01234567890abcdefghijklmn +authDate=2016-07-20T11:30:56-0700 +host=127.0.0.1 +topic=org.openecomp.appc.UNIT-TEST +partition=2 +maxBatchSize=100 +maxAgeMs=250 +AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler +AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler +AFT_DME2_REQ_TRACE_ON=true +AFT_ENVIRONMENT=AFTUAT +AFT_DME2_EP_CONN_TIMEOUT=15000 +AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000 +AFT_DME2_EP_READ_TIMEOUT_MS=50000 +sessionstickinessrequired=NO +DME2preferredRouterFilePath=preferredRoute.txt +MessageSentThreadOccurance=50 diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java new file mode 100644 index 000000000..a0ae92ea8 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java @@ -0,0 +1,134 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap; + +import java.io.*; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.att.nsa.mr.client.MRConsumer; +import org.json.JSONObject; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapUtil; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRPublisher.message; + + +/** + *An example of how to use the Java publisher. + */ +public class SimpleExamplePublisher +{ + + public static void main(String []args) throws InterruptedException, Exception{ + int msgCount = 1; + SimpleExamplePublisher publisher = new SimpleExamplePublisher(); + + int i=0; + + String topicProducerPropFileName = DmaapUtil.createProducerPropFile("org.openecomp.appc.UNIT-TEST", null); + while (i< msgCount) + { + publisher.publishMessage(topicProducerPropFileName,i); + i++; + } + + fetchMessage(); + } + + + public void publishMessage( String producerFilePath,int count ) throws IOException, InterruptedException, Exception + { + // create our publisher + final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath); + // publish some messages + final JSONObject msg1 = new JSONObject (); + msg1.put ( "Partition:2", "Message:" +count); + //msg1.put ( "greeting", "Hello .." ); + + pub.send ( "2", msg1.toString()); + // close the publisher to make sure everything's sent before exiting. The batching + // publisher interface allows the app to get the set of unsent messages. It could + // write them to disk, for example, to try to send them later. + final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS ); + if ( stuck.size () > 0 ) + { + System.err.println ( stuck.size() + " messages unsent" ); + } + else + { + System.out.println ( "Clean exit; all messages sent." ); + } + } + + + public static void fetchMessage() + { + int count = 0; + + try + { + String topic = "org.openecomp.appc.UNIT-TEST"; + Properties props = new Properties(); + props.put("id", "1"); + props.put("group", "group1"); + String topicConsumerPropFileName1 = DmaapUtil.createConsumerPropFile(topic,props); + final MRConsumer consumer1 = MRClientFactory.createConsumer ( topicConsumerPropFileName1); + + props = new Properties(); + props.put("id", "2"); + props.put("group", "group2"); + String topicConsumerPropFileName2 = DmaapUtil.createConsumerPropFile(topic,props); + final MRConsumer consumer2 = MRClientFactory.createConsumer ( topicConsumerPropFileName2); + + for ( String msg : consumer1.fetch () ) + { + count++; + System.out.println ( "consumer1 "+count + ": " + msg ); + } + for ( String msg : consumer2.fetch () ) + { + count++; + System.out.println ( "consumer1 "+count + ": " + msg ); + } + + + } + catch ( Exception x ) + { + System.out.println("inside cons exc"); + System.err.println ( x.getClass().getName () + ": " + x.getMessage () ); + } + } +} + + + + + + + + + diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/dmaap/TestAppcDmaapAdapterActivator.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestAppcDmaapAdapterActivator.java index e97014198..6626938a7 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/dmaap/TestAppcDmaapAdapterActivator.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestAppcDmaapAdapterActivator.java @@ -19,13 +19,13 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.messaging.dmaap; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import org.junit.Test; -import org.openecomp.appc.adapter.dmaap.AppcDmaapAdapterActivator; +import org.openecomp.appc.adapter.messaging.dmaap.AppcDmaapAdapterActivator; public class TestAppcDmaapAdapterActivator { diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapConsuming.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapConsuming.java new file mode 100644 index 000000000..23647d61a --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapConsuming.java @@ -0,0 +1,84 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap; + + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.openecomp.appc.adapter.message.Consumer; +import org.openecomp.appc.adapter.messaging.dmaap.http.HttpDmaapConsumerImpl; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapConsumerImpl; +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; +import org.junit.Ignore; + +import java.util.Arrays; +import java.util.List; + +/** + * Must have a DMaaP cluster or simulator up and running + * Update the hostname, topic, client properties in + * resources/org/openecomp/appc/default.properties + * + */ +public class TestDmaapConsuming { + + private static Consumer dmaapConsumer; + private static Consumer httpConsumer; + + @BeforeClass + public static void setUp() { + + Configuration configuration = ConfigurationFactory.getConfiguration(); + + List<String> hosts = Arrays.asList(configuration.getProperty("poolMembers").split(",")); + String topic = configuration.getProperty("topic.read"); + String consumerName = configuration.getProperty("client.name"); + String consumerId = configuration.getProperty("client.name.id"); + String msgFilter = configuration.getProperty("message.filter"); + String user = configuration.getProperty("dmaap.appc.username"); + String password = configuration.getProperty("dmaap.appc.password"); + + httpConsumer = new HttpDmaapConsumerImpl(hosts, topic, consumerName, consumerId, msgFilter); + dmaapConsumer = new DmaapConsumerImpl(hosts, topic, consumerName, consumerId,user,password,msgFilter); + } + + @Test + @Ignore + public void testHttpFetchMessages() { + testFetchMessages(httpConsumer); + } + + @Test + @Ignore + public void testFetchMessages() { + testFetchMessages(dmaapConsumer); + } + + private void testFetchMessages(Consumer consumer) { + List<String> messages = consumer.fetch(1000, 100); + Assert.assertNotNull(messages); + Assert.assertFalse(messages.isEmpty()); + } + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapEventSender.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapEventSender.java new file mode 100644 index 000000000..b35f9871d --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapEventSender.java @@ -0,0 +1,169 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap; + +import org.openecomp.sdnc.sli.SvcLogicContext; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.openecomp.appc.adapter.message.MessageDestination; +import org.openecomp.appc.adapter.message.Producer; +import org.openecomp.appc.adapter.message.event.EventHeader; +import org.openecomp.appc.adapter.message.event.EventMessage; +import org.openecomp.appc.adapter.message.event.EventStatus; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl; +import org.openecomp.appc.adapter.messaging.dmaap.impl.EventSenderDmaapImpl; +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; +import org.openecomp.appc.exceptions.APPCException; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + + +public class TestDmaapEventSender { + + private static Properties props; + private static Map<String,Producer> producerMap = new HashMap<>(); + private static EventMessage eventMessage; + + @BeforeClass + public static void setUp() { + + Configuration configuration = ConfigurationFactory.getConfiguration(); // test.properties file placed in home dir. + + props = new Properties(); + props.setProperty(EventSenderDmaapImpl.EVENT_POOL_MEMBERS, + configuration.getProperty(EventSenderDmaapImpl.EVENT_POOL_MEMBERS) != null ? + configuration.getProperty(EventSenderDmaapImpl.EVENT_POOL_MEMBERS) : "member1,member2,member3"); + props.setProperty(EventSenderDmaapImpl.EVENT_TOPIC_WRITE, + configuration.getProperty(EventSenderDmaapImpl.EVENT_TOPIC_WRITE) != null ? + configuration.getProperty(EventSenderDmaapImpl.EVENT_TOPIC_WRITE) : "topic1"); + + String eventClientKey = configuration.getProperty(EventSenderDmaapImpl.DMAAP_USERNAME); + if (eventClientKey != null) { + props.setProperty(EventSenderDmaapImpl.DMAAP_USERNAME,eventClientKey); + } + String eventClientSecret = configuration.getProperty(EventSenderDmaapImpl.DMAAP_PASSWORD); + if (eventClientSecret != null) { + props.setProperty(EventSenderDmaapImpl.DMAAP_PASSWORD, eventClientSecret); + } + + Producer producer = Mockito.mock(DmaapProducerImpl.class); + producerMap.put(MessageDestination.DCAE.toString(),producer); + Mockito.when(producer.post(Matchers.anyString(), Matchers.anyString())).thenReturn(true); + + eventMessage = new EventMessage( + new EventHeader("2016-03-15T10:59:33.79Z", "1.01", "17"), + new EventStatus(404, "No krokodil found")); + } + + @Test + @Ignore // requires connection to a live DMaaP server + public void testDmaapEventSenderWithProperties() { + EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl(); + eventSender.initialize(); + eventSender.setProducerMap(producerMap); + Assert.assertTrue(eventSender.sendEvent(MessageDestination.DCAE, eventMessage)); + } + + @Test + public void testDmaapEventSenderWithNullProperties() { + EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl(); +// eventSender.initialize(); + eventSender.setProducerMap(producerMap); + Assert.assertTrue(eventSender.sendEvent(MessageDestination.DCAE, eventMessage)); + } + + /* + * This test runs agains a real Dmaap (or a simulator) that should be cofigured in test.properties file. + */ + @Test + @Ignore // requires connection to a live DMaaP server + public void testDmaapEventSenderWithDmaapSim() { + EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl(); + eventSender.initialize(); + Assert.assertTrue(eventSender.sendEvent(MessageDestination.DCAE, eventMessage)); + } + + + @Test + @Ignore // requires connection to a live DMaaP server + public void testDmaapEventSenderDG() throws APPCException { + EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl(); + eventSender.initialize(); + eventSender.setProducerMap(producerMap); + Map<String,String> params = new HashMap<>(); + + params.put("eventTime", eventMessage.getEventHeader().getEventTime()); + params.put("apiVer", eventMessage.getEventHeader().getApiVer()); + params.put("eventId", eventMessage.getEventHeader().getEventId()); + params.put("reason", eventMessage.getEventStatus().getReason()); + params.put("code", "200"); + + Assert.assertTrue(eventSender.sendEvent(MessageDestination.DCAE,params, new SvcLogicContext())); + } + + @Test(expected = APPCException.class) + @Ignore // requires connection to a live DMaaP server + public void testDmaapEventSenderDGNoParams() throws APPCException { + EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl(); + eventSender.initialize(); + eventSender.setProducerMap(producerMap); + Map<String,String> params = new HashMap<>(); + + Assert.assertFalse(eventSender.sendEvent(MessageDestination.DCAE,params, new SvcLogicContext())); + } + + + @Test(expected = APPCException.class) + @Ignore // requires connection to a live DMaaP server + public void testDmaapEventSenderDGNullParam() throws APPCException { + EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl(); + eventSender.initialize(); + eventSender.setProducerMap(producerMap); + Map<String,String> params = null; + + Assert.assertFalse(eventSender.sendEvent(MessageDestination.DCAE,params, new SvcLogicContext())); + } + + @Test(expected = APPCException.class) + @Ignore // requires connection to a live DMaaP server + public void testDmaapEventSenderDGNoParam() throws APPCException { + EventSenderDmaapImpl eventSender = new EventSenderDmaapImpl(); + eventSender.initialize(); + eventSender.setProducerMap(producerMap); + Map<String,String> params = new HashMap<>(); + +// params.put("apiVer", eventMessage.getEventHeader().getApiVer()); + params.put("eventId", eventMessage.getEventHeader().getEventId()); + params.put("reason", eventMessage.getEventStatus().getReason()); + params.put("code", "200"); + + eventSender.sendEvent(MessageDestination.DCAE,params, new SvcLogicContext()); + } + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapProducing.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapProducing.java new file mode 100644 index 000000000..b5b7c9538 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/TestDmaapProducing.java @@ -0,0 +1,80 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap; + + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.openecomp.appc.adapter.message.Producer; +import org.openecomp.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl; +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; + +import java.util.Arrays; +import java.util.List; + +/** + * Must have a DMaaP cluster or simulator up and running + * Update the hostname, topic, client properties in + * resources/org/openecomp/appc/default.properties + * + */ +public class TestDmaapProducing { + + private static Producer httpProducer; + private static Producer dmaapProducer; + + @BeforeClass + public static void setUp() { + + Configuration configuration = ConfigurationFactory.getConfiguration(); + + List<String> hosts = Arrays.asList(configuration.getProperty("poolMembers").split(",")); + String topic = configuration.getProperty("topic.write"); + String user = configuration.getProperty("dmaap.appc.username"); + String password = configuration.getProperty("dmaap.appc.password"); + + dmaapProducer = new DmaapProducerImpl(hosts, topic,user,password); + httpProducer = new HttpDmaapProducerImpl(hosts, topic); + httpProducer.updateCredentials(user,password); + } + + @Test + @Ignore + public void testHttpPostMessage() { + testPostMessage(httpProducer); + } + + @Test + @Ignore + public void testPostMessages() { + testPostMessage(dmaapProducer); + } + + private void testPostMessage(Producer producer) { + Assert.assertTrue(producer.post("partition", "{\"message\": \"Hello, world!\"}")); + } + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java new file mode 100644 index 000000000..4b936351a --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/openecomp/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java @@ -0,0 +1,242 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.messaging.dmaap.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.*; + +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.openecomp.appc.adapter.message.Consumer; +import org.openecomp.appc.adapter.message.Producer; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapConsumerImpl; +import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl; +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; + +public class TestConsumerProducerImpl { + + private Collection<String> urls; + private String topicRead; + private String topicWrite; + private String group; + private String groupId; + private String user; + private String password; + + @Before + public void setup() { + System.out.println("setup entry..."); +// urls = new HashSet<String>(); +// urls.add("dmaaphost1"); +// urls.add("dmaaphost2"); +// //remove unavailable dmaap instance for build +// //urls.add("dmaaphost3"); +// +// topicRead = "APPC-UNIT-TEST"; +// topicWrite = "APPC-UNIT-TEST"; +// group = "APPC-CLIENT"; +// groupId = "0"; + Configuration configuration = ConfigurationFactory.getConfiguration(); + List<String> hosts = Arrays.asList(configuration.getProperty("poolMembers").split(",")); + urls = new HashSet<String>(hosts); + topicRead = configuration.getProperty("topic.read"); + topicWrite = configuration.getProperty("topic.write"); + user = configuration.getProperty("dmaap.appc.username"); + password = configuration.getProperty("dmaap.appc.password"); + group = "APPC-CLIENT"; + groupId = "0"; + + + runoff(); + } + + /** + * Test that we can read and write and that the messages come back in order + */ + @Ignore + @Test + public void testWriteRead() { + System.out.println("testWriteRead entry..."); + Producer p = new DmaapProducerImpl(urls, topicWrite,user,password); + + String s1 = UUID.randomUUID().toString(); + String s2 = UUID.randomUUID().toString(); + if (p.post("TEST", s1) == false) { + // try again - 2nd attempt may succeed if cambria client failed over + p.post("TEST", s1); + } + if (p.post("TEST", s2) == false) { + // try again - 2nd attempt may succeed if cambria client failed over + p.post("TEST", s2); + } + + Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); + List<String> out = c.fetch(); + // if fetch is empty, try again - a 2nd attempt may succeed if + // cambria client has failed over + if ((out == null) || out.isEmpty()) { + out = c.fetch(); + } + + assertNotNull(out); + assertEquals(2, out.size()); + assertEquals(s1, out.get(0)); + assertEquals(s2, out.get(1)); + + } + + /** + * Test that we can read and write and that the messages come back in order + */ + @Test + @Ignore // Https Not support on jenkins server + public void testWriteReadHttps() { + System.out.println("testWriteReadHttps entry..."); + Producer p = new DmaapProducerImpl(urls, topicWrite,user,password); + p.useHttps(true); + + String s1 = UUID.randomUUID().toString(); + String s2 = UUID.randomUUID().toString(); + if (p.post("TEST", s1) == false) { + // try again - 2nd attempt may succeed if cambria client failed over + p.post("TEST", s1); + } + if (p.post("TEST", s2) == false) { + // try again - 2nd attempt may succeed if cambria client failed over + p.post("TEST", s2); + } + + Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); + c.useHttps(true); + + List<String> out = c.fetch(); + // if fetch is empty, try again - a 2nd attempt may succeed if + // cambria client has failed over + if ((out == null) || out.isEmpty()) { + out = c.fetch(); + } + + assertNotNull(out); + assertEquals(2, out.size()); + assertEquals(s1, out.get(0)); + assertEquals(s2, out.get(1)); + + } + + @Test + @Ignore // requires connection to a live DMaaP server + public void testBadUrl() { + System.out.println("testBadUrl entry..."); + urls.clear(); + urls.add("something.local"); + + // Producer p = new DmaapProducerImpl(urls, topicWrite); + Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); + List<String> result = c.fetch(1000, 1000); + + assertNotNull(result); + assertTrue(result.isEmpty()); + } + + @Test + @Ignore // requires connection to a live DMaaP server + public void testAuth() { + System.out.println("testAuth entry..."); + Producer p = new DmaapProducerImpl(urls, topicWrite,user,password); + Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); + + p.updateCredentials("key", "secret"); + c.updateCredentials("key", "secret"); + + // TODO - Do some protected dmaap queries when the apis are updated + } + + /** + * Test DMaaP client failover to another server when a bad url is encountered + + */ + @Ignore + @Test + public void testFailover() { + System.out.println("testFailover entry..."); + urls.clear(); + urls.add("openecomp2.org"); // bad url + urls.add("dmaaphost2"); + Producer p = new DmaapProducerImpl(urls, topicWrite,user,password); + + String s1 = UUID.randomUUID().toString(); + if (p.post("TEST", s1) == false) { + // try again - cambria client should have failed over + p.post("TEST", s1); + } + + urls.clear(); + urls.add("openecomp3.org"); // bad url + urls.add("dmaaphost3"); + + Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); + List<String> out = c.fetch(1000, 1000); + // if fetch is empty, try again - cambria client should have failed over + if ((out == null) || out.isEmpty()) { + out = c.fetch(); + } + + assertNotNull(out); + assertEquals(1, out.size()); + assertEquals(s1, out.get(0)); + } + + /** + * Reads through the entire topic so it is clean for testing. WARNING - ONLY USE ON TOPICS WHERE YOU ARE THE ONLY + * WRITER. Could end in an infinite loop otherwise. + */ + private void runoff() { + Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); + List<String> data; + do { + data = c.fetch(1000, 10000); + } while (!data.isEmpty() && data.size()!=1); + } + + @Test + @Ignore + public void testFilter() { + System.out.println("testFilter entry..."); + List<String> res; + String filter = "{\"class\":\"Assigned\",\"field\":\"request\"}"; + Consumer c = new DmaapConsumerImpl(urls, "DCAE-CLOSED-LOOP-EVENTS-DEV1510SIM", group, groupId,user,password,filter); + res = c.fetch(2000, 10); + assertFalse(res.isEmpty()); + + res.clear(); + filter = "{\"class\":\"Assigned\",\"field\":\"response\"}"; + c = new DmaapConsumerImpl(urls, "DCAE-CLOSED-LOOP-EVENTS-DEV1510SIM", group, groupId,user,password, filter); + res = c.fetch(2000, 10); + assertTrue(res.isEmpty()); + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-features/pom.xml b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-features/pom.xml index 2181888fc..8f5b55c73 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-features/pom.xml +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-features/pom.xml @@ -19,6 +19,12 @@ </dependency> <dependency> + <groupId>org.openecomp.appc</groupId> + <artifactId>appc-message-adapter-factory</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <scope>compile</scope> diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-features/src/main/resources/features.xml b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-features/src/main/resources/features.xml index 90a31629e..baf04ddec 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-features/src/main/resources/features.xml +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-features/src/main/resources/features.xml @@ -34,6 +34,8 @@ <feature version="${broker-mdsal.version}">odl-mdsal-broker</feature> <!-- <feature version="${sdnctl.sli.version}">sdnc-sli</feature> --> <bundle>mvn:org.openecomp.appc/appc-dmaap-adapter-bundle/${project.version}</bundle> + <bundle>mvn:org.openecomp.appc/appc-message-adapter-factory/${project.version}</bundle> + <bundle>mvn:org.openecomp.appc/appc-message-adapter-api/${project.version}</bundle> </feature> diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-installer/pom.xml b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-installer/pom.xml index 851c3a19e..845c477f4 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-installer/pom.xml +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-installer/pom.xml @@ -37,6 +37,16 @@ <artifactId>appc-dmaap-adapter-bundle</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.openecomp.appc</groupId> + <artifactId>appc-message-adapter-api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.openecomp.appc</groupId> + <artifactId>appc-message-adapter-factory</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> diff --git a/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/pom.xml b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/pom.xml new file mode 100644 index 000000000..5e2e6c6b2 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/pom.xml @@ -0,0 +1,141 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.openecomp.appc</groupId> + <artifactId>appc-dmaap-adapter</artifactId> + <version>1.1.0-SNAPSHOT</version> + </parent> + + <artifactId>appc-message-adapter-api</artifactId> + <packaging>bundle</packaging> + <name>appc-message-adapter-api</name> + + <dependencies> + <dependency> + <groupId>org.openecomp.appc</groupId> + <artifactId>appc-metric-bundle</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.openecomp.appc</groupId> + <artifactId>appc-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>equinoxSDK381</groupId> + <artifactId>org.eclipse.osgi</artifactId> + </dependency> + + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + </dependency> + <dependency> + <groupId>org.objenesis</groupId> + <artifactId>objenesis</artifactId> + <version>2.2</version> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.openecomp.sdnc.core</groupId> + <artifactId>sli-common</artifactId> + <scope>compile</scope> + <!-- Added exclusion to prevent missing dependency issue on dblib --> + <exclusions> + <exclusion> + <groupId>org.openecomp.sdnc.core</groupId> + <artifactId>dblib-provider</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.openecomp.sdnc.core</groupId> + <artifactId>sli-provider</artifactId> + <scope>compile</scope> + <!-- Added exclusion to prevent missing dependency issue on dblib --> + <exclusions> + <exclusion> + <groupId>org.openecomp.sdnc.core</groupId> + <artifactId>dblib-provider</artifactId> + </exclusion> + </exclusions> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <extensions>true</extensions> + <configuration> + <instructions> + <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName> + <Bundle-Version>${project.version}</Bundle-Version> + <Export-Package>org.openecomp.appc.adapter.message.*</Export-Package> + <!--<Export-Serice>org.openecomp.appc.adapter.message.EventSender</Export-Serice>--> + <Import-Package>org.openecomp.appc.metricservice.*,com.att.nsa.*,org.openecomp.sdnc.sli.*,org.osgi.framework.*,!org.osgi.service.event.*,org.osgi.service.*,org.osgi.util.*,org.slf4j.*,com.vmware.*,org.apache.xerces.*,javax.net.ssl.*,org.xml.sax.*,javax.xml.*,javax.naming.*,javax.crypto.*, com.sun.jersey.spi.container.servlet,org.eclipse.jetty.servlets</Import-Package> + <!--<Embed-Dependency>appc-common</Embed-Dependency>--> + <Embed-Dependency>*;scope=compile|runtime;artifactId=!appc-metric-bundle|sli-common|sli-provider|org.eclipse.osgi|slf4j-api|jcl-over-slf4j|mysql-connector-java|xml-apis|pax-*</Embed-Dependency> + <Embed-Transitive>true</Embed-Transitive> + <Bundle-Blueprint>OSGI-INF/blueprint/blueprint.xml</Bundle-Blueprint> + </instructions> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/CallableConsumer.java index 7c282911d..bbc541167 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/CallableConsumer.java @@ -19,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.message; import java.util.List; import java.util.concurrent.Callable; diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/Consumer.java index 32034e5fb..e99f884a2 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/Consumer.java @@ -19,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.message; import java.util.List; @@ -54,9 +54,6 @@ public interface Consumer { */ public void updateCredentials(String apiKey, String apiSecret); - // TODO - Implement once Cambria allows you to set outside of constructor - // public void setFilter(String filter); - /** * Creates a dmaap client using a https connection * @@ -65,4 +62,9 @@ public interface Consumer { */ public void useHttps(boolean yes); + /** + * Closes the dmaap client https connection + */ + void close(); + } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/EventSender.java index 7d4a7c090..e4338f0e4 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/EventSender.java @@ -19,17 +19,18 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.message; -import java.util.Map; - -import org.openecomp.appc.adapter.dmaap.event.EventMessage; import org.openecomp.appc.exceptions.APPCException; import org.openecomp.sdnc.sli.SvcLogicContext; import org.openecomp.sdnc.sli.SvcLogicJavaPlugin; +import java.util.Map; + +import org.openecomp.appc.adapter.message.event.EventMessage; public interface EventSender extends SvcLogicJavaPlugin{ - boolean sendEvent(DmaapDestination destination, EventMessage msg); - boolean sendEvent(DmaapDestination destination, Map<String, String> params, SvcLogicContext ctx) throws APPCException; + boolean sendEvent(MessageDestination destination, EventMessage msg); + boolean sendEvent(MessageDestination destination, EventMessage msg,String eventTopicName); + boolean sendEvent(MessageDestination destination, Map<String, String> params, SvcLogicContext ctx) throws APPCException; } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/Manager.java index 183e618ba..2990036f5 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/Manager.java @@ -19,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.message; import java.util.Set; diff --git a/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/MessageAdapterFactory.java b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/MessageAdapterFactory.java new file mode 100644 index 000000000..741563b58 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/MessageAdapterFactory.java @@ -0,0 +1,39 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.appc.adapter.message; + +import java.util.Collection; +import java.util.Set; + +import org.openecomp.appc.adapter.message.Consumer; +import org.openecomp.appc.adapter.message.Producer; + +public interface MessageAdapterFactory { + + // TODO: how do you configure the MessageService type? + + public Producer createProducer(Collection<String> pools, String writeTopic, String apiKey, String apiSecret); + + public Producer createProducer(Collection<String> pools, Set<String> writeTopics, String apiKey, String apiSecret); + + public Consumer createConsumer(Collection<String> pool, String readTopic, + String clientName, String clientId, String filter_json, String apiKey, String apiSecret); +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/MessageDestination.java index efbe194ba..b952441e1 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/MessageDestination.java @@ -19,8 +19,8 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.message; -public enum DmaapDestination { +public enum MessageDestination { DCAE } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/Producer.java index f19c516be..5981606cb 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/Producer.java @@ -19,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap; +package org.openecomp.appc.adapter.message; public interface Producer { @@ -43,4 +43,9 @@ public interface Producer { */ public void useHttps(boolean yes); + /** + * Closes the dmaap client https connection + */ + void close(); + } diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/event/EventHeader.java index dd951fe37..3d897d42a 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/event/EventHeader.java @@ -19,11 +19,10 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap.event; +package org.openecomp.appc.adapter.message.event; import com.fasterxml.jackson.annotation.JsonProperty; - public class EventHeader { @JsonProperty("eventTime") diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/event/EventMessage.java index af5cff2f9..e5fad6089 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/event/EventMessage.java @@ -19,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap.event; +package org.openecomp.appc.adapter.message.event; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; @@ -45,8 +45,6 @@ import java.io.Serializable; */ - - @JsonSerialize(include = Inclusion.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) public class EventMessage implements Serializable { diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/event/EventStatus.java index f5d7a59d4..85b4db02f 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/java/org/openecomp/appc/adapter/message/event/EventStatus.java @@ -19,11 +19,10 @@ * ============LICENSE_END========================================================= */ -package org.openecomp.appc.adapter.dmaap.event; +package org.openecomp.appc.adapter.message.event; import com.fasterxml.jackson.annotation.JsonProperty; - public class EventStatus { @JsonProperty("code") diff --git a/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/resources/OSGI-INF/blueprint/blueprint.xml new file mode 100644 index 000000000..a1e5c7172 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -0,0 +1,28 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ============LICENSE_START======================================================= + openECOMP : APP-C + ================================================================================ + Copyright (C) 2017 AT&T Intellectual Property. All rights + reserved. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ============LICENSE_END========================================================= + --> + + +<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd"> + +</blueprint> diff --git a/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/resources/org/openecomp/appc/default.properties b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/resources/org/openecomp/appc/default.properties new file mode 100644 index 000000000..00c95bca1 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-api/src/main/resources/org/openecomp/appc/default.properties @@ -0,0 +1,23 @@ +### +# ============LICENSE_START======================================================= +# openECOMP : APP-C +# ================================================================================ +# Copyright (C) 2017 AT&T Intellectual Property. All rights +# reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +org.openecomp.appc.bootstrap.file=appc.properties +org.openecomp.appc.bootstrap.path=/opt/openecomp/appc/data/properties,${user.home},. diff --git a/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/pom.xml b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/pom.xml new file mode 100644 index 000000000..21cba8179 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/pom.xml @@ -0,0 +1,150 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.openecomp.appc</groupId> + <artifactId>appc-dmaap-adapter</artifactId> + <version>1.1.0-SNAPSHOT</version> + </parent> + + <artifactId>appc-message-adapter-factory</artifactId> + <packaging>bundle</packaging> + <name>appc-message-adapter-factory</name> + + <dependencies> + <dependency> + <groupId>org.openecomp.appc</groupId> + <artifactId>appc-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.openecomp.appc</groupId> + <artifactId>appc-dmaap-adapter-bundle</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.openecomp.appc</groupId> + <artifactId>appc-message-adapter-api</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>equinoxSDK381</groupId> + <artifactId>org.eclipse.osgi</artifactId> + </dependency> + + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + </dependency> + <dependency> + <groupId>org.objenesis</groupId> + <artifactId>objenesis</artifactId> + <version>2.2</version> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.openecomp.sdnc.core</groupId> + <artifactId>sli-common</artifactId> + <scope>compile</scope> + <!-- Added exclusion to prevent missing dependency issue on dblib --> + <exclusions> + <exclusion> + <groupId>org.openecomp.sdnc.core</groupId> + <artifactId>dblib-provider</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.openecomp.sdnc.core</groupId> + <artifactId>sli-provider</artifactId> + <scope>compile</scope> + <!-- Added exclusion to prevent missing dependency issue on dblib --> + <exclusions> + <exclusion> + <groupId>org.openecomp.sdnc.core</groupId> + <artifactId>dblib-provider</artifactId> + </exclusion> + </exclusions> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <extensions>true</extensions> + <configuration> + <instructions> + <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName> + <Bundle-Version>${project.version}</Bundle-Version> + <Export-Package>org.openecomp.appc.adapter.factory</Export-Package> + <Bundle-Activator>org.openecomp.appc.adapter.factory.DmaapMessageAdapterFactoryActivator</Bundle-Activator> + <Export-Service>org.openecomp.appc.adapter.message.MessageAdapterFactory</Export-Service> + <Import-Package>org.openecomp.appc.adapter.messaging.*,org.openecomp.appc.adapter.message.*,org.openecomp.appc.metricservice.*,com.att.nsa.*org.openecomp.sdnc.core.sli.*,org.osgi.framework.*,!org.osgi.service.event.*,org.osgi.service.*,org.osgi.util.*,org.slf4j.*,com.vmware.*,org.apache.xerces.*,javax.net.ssl.*,org.xml.sax.*,javax.xml.*,javax.naming.*,javax.crypto.*, com.sun.jersey.spi.container.servlet,org.eclipse.jetty.servlets</Import-Package> + <Embed-Dependency>*;scope=compile|runtime;artifactId=!appc-metric-bundle|sli-common|org.eclipse.osgi|slf4j-api|jcl-over-slf4j|mysql-connector-java|xml-apis|pax-*</Embed-Dependency> + <Embed-Transitive>true</Embed-Transitive> + <Bundle-Blueprint>OSGI-INF/blueprint/blueprint.xml</Bundle-Blueprint> + </instructions> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/java/org/openecomp/appc/adapter/factory/DmaapMessageAdapterFactoryActivator.java b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/java/org/openecomp/appc/adapter/factory/DmaapMessageAdapterFactoryActivator.java new file mode 100644 index 000000000..26e0f3da9 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/java/org/openecomp/appc/adapter/factory/DmaapMessageAdapterFactoryActivator.java @@ -0,0 +1,44 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.appc.adapter.factory; + +import org.openecomp.appc.adapter.message.MessageAdapterFactory; +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceRegistration; + +public class DmaapMessageAdapterFactoryActivator implements BundleActivator { + private ServiceRegistration registration; + + @Override + public void start(BundleContext context) throws Exception { + registration = context.registerService( + MessageAdapterFactory.class.getName(), + new DmaapMessageAdapterFactoryImpl(), + null); + } + + @Override + public void stop(BundleContext context) throws Exception { + registration.unregister(); + } + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/java/org/openecomp/appc/adapter/factory/DmaapMessageAdapterFactoryImpl.java b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/java/org/openecomp/appc/adapter/factory/DmaapMessageAdapterFactoryImpl.java new file mode 100644 index 000000000..604cbf738 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/java/org/openecomp/appc/adapter/factory/DmaapMessageAdapterFactoryImpl.java @@ -0,0 +1,47 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.factory; + +import java.util.Collection; +import java.util.Set; + +import org.openecomp.appc.adapter.message.Consumer; +import org.openecomp.appc.adapter.message.MessageAdapterFactory; +import org.openecomp.appc.adapter.message.Producer; +import org.openecomp.appc.adapter.messaging.dmaap.http.HttpDmaapConsumerImpl; +import org.openecomp.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl; + +public class DmaapMessageAdapterFactoryImpl implements MessageAdapterFactory { + + public Producer createProducer(Collection<String> pools, String writeTopic, String apiKey, String apiSecret) { + return new HttpDmaapProducerImpl(pools, writeTopic); + } + + public Producer createProducer(Collection<String> pools, Set<String> writeTopics, String apiKey, String apiSecret) { + return new HttpDmaapProducerImpl(pools, writeTopics); + } + + public Consumer createConsumer(Collection<String> pool, String readTopic, + String clientName, String clientId, String filter_json, String apiKey, String apiSecret) { + return new HttpDmaapConsumerImpl(pool, readTopic, clientName, clientId, apiKey, apiSecret, filter_json); + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/java/org/openecomp/appc/adapter/factory/MessageService.java b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/java/org/openecomp/appc/adapter/factory/MessageService.java new file mode 100644 index 000000000..5bbfd6ddb --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/java/org/openecomp/appc/adapter/factory/MessageService.java @@ -0,0 +1,56 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.factory; +/** + * The message service types that are available. Only DMaaP available + **/ +public enum MessageService { + DMaaP("dmaap"); + + private String val; + + private MessageService(String val) { + this.val = val; + } + + public String getValue() { + return val; + } + + /** + * Tries to match a string to a MessageService. If no match is found, returns the default (DMaaP) + * + * @param input + * the string to try and match + * @return A MessasgeService + */ + public static MessageService parse(String input) { + if (input != null) { + for (MessageService ms : MessageService.values()) { + if (ms.getValue().equals(input.toLowerCase())) { + return ms; + } + } + } + return MessageService.DMaaP; // Default + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/resources/OSGI-INF/blueprint/blueprint.xml new file mode 100644 index 000000000..a1e5c7172 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -0,0 +1,28 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ============LICENSE_START======================================================= + openECOMP : APP-C + ================================================================================ + Copyright (C) 2017 AT&T Intellectual Property. All rights + reserved. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ============LICENSE_END========================================================= + --> + + +<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd"> + +</blueprint> diff --git a/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/resources/org/openecomp/appc/default.properties b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/resources/org/openecomp/appc/default.properties new file mode 100644 index 000000000..00c95bca1 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-message-adapter-factory/src/main/resources/org/openecomp/appc/default.properties @@ -0,0 +1,23 @@ +### +# ============LICENSE_START======================================================= +# openECOMP : APP-C +# ================================================================================ +# Copyright (C) 2017 AT&T Intellectual Property. All rights +# reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +org.openecomp.appc.bootstrap.file=appc.properties +org.openecomp.appc.bootstrap.path=/opt/openecomp/appc/data/properties,${user.home},. diff --git a/appc-adapters/appc-dmaap-adapter/pom.xml b/appc-adapters/appc-dmaap-adapter/pom.xml index 5d90baacd..212488a2c 100644 --- a/appc-adapters/appc-dmaap-adapter/pom.xml +++ b/appc-adapters/appc-dmaap-adapter/pom.xml @@ -9,7 +9,7 @@ <artifactId>appc-dmaap-adapter</artifactId> <name>DMaaP Adapter</name> - <description>Adapter to read and write messages on the Universal Event Broker (Cambria).</description> + <description>Adapter to read and write messages on the DMaaP Service</description> <packaging>pom</packaging> <reporting> @@ -98,8 +98,10 @@ </reporting> <modules> + <module>appc-message-adapter-api</module> <module>appc-dmaap-adapter-bundle</module> <module>appc-dmaap-adapter-features</module> <module>appc-dmaap-adapter-installer</module> + <module>appc-message-adapter-factory</module> </modules> </project> |