diff options
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle')
13 files changed, 28 insertions, 1290 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/pom.xml b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/pom.xml index 188d3e764..a40453dee 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/pom.xml +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/pom.xml @@ -33,6 +33,10 @@ <name>DMaaP Adapter - bundle</name> <dependencies> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </dependency> <dependency> <groupId>org.onap.appc</groupId> <artifactId>appc-message-adapter-api</artifactId> @@ -162,14 +166,6 @@ </exclusions> </dependency> - <!-- DMaaP Client --> - <dependency> - <groupId>org.onap.dmaap.messagerouter.dmaapclient</groupId> - <artifactId>dmaapClient</artifactId> - <version>${dmaap.client.version}</version> -<!-- <classifier>jar-with-dependencies</classifier> --> - </dependency> - <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> @@ -178,6 +174,11 @@ <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> + + <dependency> + <groupId>javax.ws.rs</groupId> + <artifactId>javax.ws.rs-api</artifactId> + </dependency> </dependencies> <build> diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java index a9af8c702..84b5a5ff6 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java @@ -50,6 +50,11 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer //for test purposes } + public HttpDmaapProducerImpl(Collection<String> urls, String topicName, String username, String password) { + this(urls, topicName); + updateCredentials(username, password); + } + public HttpDmaapProducerImpl(Collection<String> urls, String topicName) { topics = new HashSet<>(); topics.add(topicName); diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java deleted file mode 100644 index 30455201e..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java +++ /dev/null @@ -1,244 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ================================================================================ - * Modifications Copyright (C) 2018 IBM - * ================================================================================ - * Modifications Copyright (C) 2019 Ericsson - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.impl; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import com.att.nsa.mr.client.MRClientFactory; -import com.att.nsa.mr.client.MRConsumer; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Properties; -import org.apache.commons.lang3.StringUtils; -import org.onap.appc.adapter.message.Consumer; -import org.onap.appc.adapter.messaging.dmaap.utils.DmaapUtil; -import org.onap.appc.configuration.Configuration; -import org.onap.appc.configuration.ConfigurationFactory; -import org.onap.appc.metricservice.MetricRegistry; -import org.onap.appc.metricservice.MetricService; -import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric; -import org.onap.appc.metricservice.metric.Metric; -import org.onap.appc.metricservice.metric.MetricType; -import org.onap.appc.metricservice.policy.PublishingPolicy; -import org.onap.appc.metricservice.publisher.LogPublisher; -import org.osgi.framework.BundleContext; -import org.osgi.framework.FrameworkUtil; -import org.osgi.framework.ServiceReference; - -public class DmaapConsumerImpl implements Consumer { - - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class); - private Configuration configuration = ConfigurationFactory.getConfiguration(); - // Default values - private static final int DEFAULT_TIMEOUT_MS = 60000; - private static final int DEFAULT_LIMIT = 1000; - private String topic; - private boolean isMetricEnabled = false; - private boolean useHttps = false; - private MetricRegistry metricRegistry; - private MRConsumer client = null; - private Properties props = null; - - public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId, - String user, String password) { - - this(urls, topicName, consumerGroupName, consumerId, user, password, null); - } - - public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId, - String user, String password, String filter) { - - this.topic = topicName; - this.props = new Properties(); - String urlsStr = StringUtils.join(urls, ','); - props.setProperty("host", urlsStr); - props.setProperty("group", consumerGroupName); - props.setProperty("id", consumerId); - if (user != null && password != null) { - props.setProperty("username", user); - props.setProperty("password", password); - } else { - props.setProperty("TransportType", "HTTPNOAUTH"); - } - - if (filter != null) { - props.setProperty("filter", filter); - } - } - - private void initMetric() { - LOG.debug("Metric getting initialized"); - MetricService metricService = getMetricService(); - if (metricService != null) { - metricRegistry = metricService.createRegistry("APPC"); - - DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory() - .dmaapRequestCounterBuilder().withName("DMAAP_KPI").withType(MetricType.COUNTER) - .withRecievedMessage(0).withPublishedMessage(0).build(); - - if (metricRegistry.register(dmaapKpiMetric)) { - Metric[] metrics = new Metric[] { dmaapKpiMetric }; - LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); - LogPublisher[] logPublishers = new LogPublisher[1]; - logPublishers[0] = logPublisher; - - PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory() - .scheduledPolicyBuilder() - .withPublishers(logPublishers) - .withMetrics(metrics) - .build(); - - LOG.debug("Policy getting initialized"); - manuallyScheduledPublishingPolicy.init(); - LOG.debug("Metric initialized"); - } - } - } - - /** - * @return An instance of MRConsumer created from our class variables. - */ - synchronized MRConsumer getClient(int waitMs, int limit) { - try { - props.setProperty("timeout", String.valueOf(waitMs)); - props.setProperty("limit", String.valueOf(limit)); - String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic, props); - return MRClientFactory.createConsumer(topicProducerPropFileName); - } catch (IOException e1) { - LOG.error("failed to createConsumer", e1); - return null; - } - } - - @Override - public synchronized void updateCredentials(String key, String secret) { - LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); - props.setProperty("username", String.valueOf(key)); - props.setProperty("password", String.valueOf(secret)); - client = null; - } - - @Override - public List<String> fetch() { - return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); - } - - @Override - public List<String> fetch(int waitMs, int limit) { - Properties properties = configuration.getProperties(); - if (properties != null && properties.getProperty("metric.enabled") != null) { - isMetricEnabled = Boolean.valueOf(properties.getProperty("metric.enabled")); - } - if (isMetricEnabled) { - initMetric(); - } - LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); - List<String> out = new ArrayList<>(); - - // Create client once and reuse it on subsequent fetches. This is - // to support failover to other servers in the DMaaP cluster. - if (client == null) { - LOG.info("Getting DMaaP Client ..."); - client = getClient(waitMs, limit); - } - if (client != null) { - try { - for (String s : client.fetch(waitMs, limit)) { - out.add(s); - incrementReceivedMessage(); - } - LOG.debug(String.format("Got %d records from %s", out.size(), this.toString())); - } catch (Exception e) { - // Connection exception - LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage()), e); - try { - LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs)); - Thread.sleep(waitMs); - } catch (InterruptedException e2) { - LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs)); - Thread.currentThread().interrupt(); - } - } - } - return out; - } - - private void incrementReceivedMessage() { - if (isMetricEnabled && metricRegistry != null) { - ((DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage(); - } - } - - /** - * Close consumer Dmaap client. - */ - @Override - public void close() { - LOG.debug("Closing Dmaap consumer client...."); - if (client != null) { - client.close(); - } - } - - @Override - public String toString() { - String hostStr = (props == null || props.getProperty("host") == null ? "N/A" : props.getProperty("host")); - String group = (props == null || props.getProperty("group") == null ? "N/A" : props.getProperty("group")); - String id = (props == null || props.getProperty("id") == null ? "N/A" : props.getProperty("id")); - return String.format("Consumer %s/%s listening to %s on [%s]", group, id, topic, hostStr); - } - - @Override - public void useHttps(boolean yes) { - useHttps = yes; - } - - protected MetricService getMetricService() { - BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext(); - ServiceReference sref = bctx.getServiceReference(MetricService.class.getName()); - if (sref != null) { - LOG.info("Metric Service from bundlecontext"); - return (MetricService) bctx.getService(sref); - } else { - LOG.info("Metric Service error from bundlecontext"); - LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService"); - return null; - } - } - - public Properties getProperties() { - return props; - } - - public boolean isHttps() { - return useHttps; - } - -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java deleted file mode 100644 index efff18ed9..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java +++ /dev/null @@ -1,228 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Modifications Copyright (C) 2019 IBM - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.impl; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import com.att.nsa.mr.client.MRBatchingPublisher; -import com.att.nsa.mr.client.MRClientFactory; -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Properties; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.StringUtils; -import org.onap.appc.adapter.message.Producer; -import org.onap.appc.adapter.messaging.dmaap.utils.DmaapUtil; -import org.onap.appc.configuration.Configuration; -import org.onap.appc.configuration.ConfigurationFactory; -import org.onap.appc.metricservice.MetricRegistry; -import org.onap.appc.metricservice.MetricService; -import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric; -import org.onap.appc.metricservice.metric.Metric; -import org.onap.appc.metricservice.metric.MetricType; -import org.onap.appc.metricservice.policy.PublishingPolicy; -import org.onap.appc.metricservice.publisher.LogPublisher; -import org.osgi.framework.BundleContext; -import org.osgi.framework.FrameworkUtil; -import org.osgi.framework.ServiceReference; - -public class DmaapProducerImpl implements Producer { - - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class); - private static final Configuration configuration = ConfigurationFactory.getConfiguration(); - - private Set<String> topics; - - private Properties props = null; - private MetricRegistry metricRegistry; - private boolean useHttps = false; - private boolean isMetricEnabled = false; - - private Set<MRBatchingPublisher> clients; - - public DmaapProducerImpl(Collection<String> urls, String topicName, String user, String password) { - this(urls, (Set<String>) null, user, password); - this.topics = new HashSet<>(); - if (topicName != null) { - Collections.addAll(topics, topicName.split(",")); - } - } - - public DmaapProducerImpl(Collection<String> urls, Set<String> topicNames, String user, String password) { - topics = topicNames; - if (urls == null) { - throw new IllegalArgumentException("Mandaory argument is null: urls"); - } - this.props = new Properties(); - String urlsStr = StringUtils.join(urls, ','); - props.setProperty("host", urlsStr); - props.setProperty("id", UUID.randomUUID().toString()); - if (user != null && password != null) { - props.setProperty("username", user); - props.setProperty("password", password); - } else { - props.setProperty("TransportType", "HTTPNOAUTH"); - } - } - - private void initMetric() { - LOG.debug("Metric getting initialized"); - MetricService metricService = getMetricservice(); - if (metricService != null) { - metricRegistry = metricService.createRegistry("APPC"); - - DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory() - .dmaapRequestCounterBuilder().withName("DMAAP_KPI").withType(MetricType.COUNTER) - .withRecievedMessage(0).withPublishedMessage(0).build(); - - if (metricRegistry.register(dmaapKpiMetric)) { - Metric[] metrics = new Metric[] { dmaapKpiMetric }; - LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); - LogPublisher[] logPublishers = new LogPublisher[1]; - logPublishers[0] = logPublisher; - - PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory() - .scheduledPolicyBuilder().withPublishers(logPublishers).withMetrics(metrics).build(); - - LOG.debug("Policy getting initialized"); - manuallyScheduledPublishingPolicy.init(); - LOG.debug("Metric initialized"); - } - } - } - - private Set<MRBatchingPublisher> getClients() { - Set<MRBatchingPublisher> out = new HashSet<>(); - for (String topic : topics) { - try { - String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic, props); - final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher(topicProducerPropFileName); - out.add(client); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } - return out; - } - - @Override - public synchronized void updateCredentials(String key, String secret) { - LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); - props.setProperty("username", String.valueOf(key)); - props.setProperty("password", String.valueOf(secret)); - clients = null; - } - - @Override - public boolean post(String partition, String data) { - LOG.debug("In DmaapProducerImpl.post()"); - boolean success = true; - Properties properties = configuration.getProperties(); - if (properties != null && properties.getProperty("metric.enabled") != null) { - isMetricEnabled = Boolean.valueOf(properties.getProperty("metric.enabled")); - } - if (isMetricEnabled) { - initMetric(); - } - - // Create clients once and reuse them on subsequent posts. This is - // to support failover to other servers in the Dmaap cluster. - if ((clients == null) || (clients.isEmpty())) { - LOG.info("Getting CambriaBatchingPublisher Clients ..."); - clients = getClients(); - } - LOG.debug("In DmaapProducerImpl.post()::: before sending to clients"); - for (MRBatchingPublisher client : clients) { - try { - LOG.debug(String.format("Posting %s to %s", data, client)); - client.send(partition, data); - } catch (IOException e) { - LOG.error(e.getMessage(), e); - success = false; - } - } - incrementPublishedMessage(); - return success; - } - - private void incrementPublishedMessage() { - if (isMetricEnabled && metricRegistry != null) { - ((DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage(); - } - } - - /** - * Close producer Dmaap client. - */ - @Override - public void close() { - if ((clients == null) || (clients.isEmpty())) { - return; - } - LOG.debug("Closing Dmaap producer clients...."); - for (MRBatchingPublisher client : clients) { - try { - client.close(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client), e); - Thread.currentThread().interrupt(); - } catch (IOException e) { - LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client), e); - } - } - } - - @Override - public void useHttps(boolean yes) { - useHttps = yes; - } - - private MetricService getMetricservice() { - BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext(); - ServiceReference sref = bctx.getServiceReference(MetricService.class.getName()); - if (sref != null) { - LOG.info("Metric Service from bundlecontext"); - return (MetricService) bctx.getService(sref); - } else { - LOG.info("Metric Service error from bundlecontext"); - LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService"); - return null; - } - } - - public Properties getProperties() { - return props; - } - - public boolean isHttps() { - return useHttps; - } - -}
\ No newline at end of file diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java index 7d8bc763a..bd4c7f08d 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java @@ -34,6 +34,7 @@ import org.onap.appc.adapter.message.Producer; import org.onap.appc.adapter.message.event.EventHeader; import org.onap.appc.adapter.message.event.EventMessage; import org.onap.appc.adapter.message.event.EventStatus; +import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl; import org.onap.appc.configuration.Configuration; import org.onap.appc.configuration.ConfigurationFactory; import org.onap.appc.exceptions.APPCException; @@ -91,7 +92,7 @@ public class EventSenderDmaapImpl implements EventSender LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS)); LOG.debug(String.format("writeTopic = %s, taken from property: %s", writeTopic, destination + "." + EVENT_TOPIC_WRITE)); LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME)); - Producer producer = new DmaapProducerImpl(pool, writeTopic,username, password); + Producer producer = new HttpDmaapProducerImpl(pool, writeTopic,username, password); for (String url : pool) { if (url.contains("3905") || url.contains("https")) { @@ -138,7 +139,7 @@ public class EventSenderDmaapImpl implements EventSender LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS)); LOG.debug(String.format("writeTopic = %s, taken from property: %s", eventTopicName, destination + "." + EVENT_TOPIC_WRITE)); LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME)); - Producer producer = new DmaapProducerImpl(pool, eventTopicName, username, password); + Producer producer = new HttpDmaapProducerImpl(pool, eventTopicName, username, password); for (String url : pool) { if (url.contains("3905") || url.contains("https")) { diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java deleted file mode 100644 index 3d50ea588..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java +++ /dev/null @@ -1,136 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap; - -import java.io.*; -import java.util.List; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import com.att.nsa.mr.client.MRConsumer; -import org.json.JSONObject; -import org.onap.appc.adapter.messaging.dmaap.utils.DmaapUtil; - -import com.att.nsa.mr.client.MRBatchingPublisher; -import com.att.nsa.mr.client.MRClientFactory; -import com.att.nsa.mr.client.MRPublisher.message; - - -/** - *An example of how to use the Java publisher. - */ -public class SimpleExamplePublisher -{ - - public static void main(String []args) throws InterruptedException, Exception{ - int msgCount = 1; - SimpleExamplePublisher publisher = new SimpleExamplePublisher(); - - int i=0; - - String topicProducerPropFileName = DmaapUtil.createProducerPropFile("org.onap.appc.UNIT-TEST", null); - while (i< msgCount) - { - publisher.publishMessage(topicProducerPropFileName,i); - i++; - } - - fetchMessage(); - } - - - public void publishMessage( String producerFilePath,int count ) throws IOException, InterruptedException, Exception - { - // create our publisher - final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath); - // publish some messages - final JSONObject msg1 = new JSONObject (); - msg1.put ( "Partition:2", "Message:" +count); - //msg1.put ( "greeting", "Hello .." ); - - pub.send ( "2", msg1.toString()); - // close the publisher to make sure everything's sent before exiting. The batching - // publisher interface allows the app to get the set of unsent messages. It could - // write them to disk, for example, to try to send them later. - final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS ); - if ( stuck.size () > 0 ) - { - System.err.println ( stuck.size() + " messages unsent" ); - } - else - { - System.out.println ( "Clean exit; all messages sent." ); - } - } - - - public static void fetchMessage() - { - int count = 0; - - try - { - String topic = "org.onap.appc.UNIT-TEST"; - Properties props = new Properties(); - props.put("id", "1"); - props.put("group", "group1"); - String topicConsumerPropFileName1 = DmaapUtil.createConsumerPropFile(topic,props); - final MRConsumer consumer1 = MRClientFactory.createConsumer ( topicConsumerPropFileName1); - - props = new Properties(); - props.put("id", "2"); - props.put("group", "group2"); - String topicConsumerPropFileName2 = DmaapUtil.createConsumerPropFile(topic,props); - final MRConsumer consumer2 = MRClientFactory.createConsumer ( topicConsumerPropFileName2); - - for ( String msg : consumer1.fetch () ) - { - count++; - System.out.println ( "consumer1 "+count + ": " + msg ); - } - for ( String msg : consumer2.fetch () ) - { - count++; - System.out.println ( "consumer1 "+count + ": " + msg ); - } - - - } - catch ( Exception x ) - { - System.out.println("inside cons exc"); - System.err.println ( x.getClass().getName () + ": " + x.getMessage () ); - } - } -} - - - - - - - - - diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java index b669dcdca..ec9740053 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java @@ -29,7 +29,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.onap.appc.adapter.message.Consumer; import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapConsumerImpl; -import org.onap.appc.adapter.messaging.dmaap.impl.DmaapConsumerImpl; import org.onap.appc.configuration.Configuration; import org.onap.appc.configuration.ConfigurationFactory; import org.junit.Ignore; @@ -45,7 +44,6 @@ import java.util.List; */ public class TestDmaapConsuming { - private static Consumer dmaapConsumer; private static Consumer httpConsumer; @BeforeClass @@ -62,7 +60,6 @@ public class TestDmaapConsuming { String password = configuration.getProperty("dmaap.appc.password"); httpConsumer = new HttpDmaapConsumerImpl(hosts, topic, consumerName, consumerId, msgFilter); - dmaapConsumer = new DmaapConsumerImpl(hosts, topic, consumerName, consumerId,user,password,msgFilter); } @Test @@ -74,7 +71,7 @@ public class TestDmaapConsuming { @Test @Ignore public void testFetchMessages() { - testFetchMessages(dmaapConsumer); + testFetchMessages(httpConsumer); } private void testFetchMessages(Consumer consumer) { diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapEventSender.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapEventSender.java index efdc2beda..c6bf6671d 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapEventSender.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapEventSender.java @@ -35,7 +35,7 @@ import org.onap.appc.adapter.message.Producer; import org.onap.appc.adapter.message.event.EventHeader; import org.onap.appc.adapter.message.event.EventMessage; import org.onap.appc.adapter.message.event.EventStatus; -import org.onap.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl; +import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl; import org.onap.appc.adapter.messaging.dmaap.impl.EventSenderDmaapImpl; import org.onap.appc.configuration.Configuration; import org.onap.appc.configuration.ConfigurationFactory; @@ -74,7 +74,7 @@ public class TestDmaapEventSender { props.setProperty(EventSenderDmaapImpl.DMAAP_PASSWORD, eventClientSecret); } - Producer producer = Mockito.mock(DmaapProducerImpl.class); + Producer producer = Mockito.mock(HttpDmaapProducerImpl.class); producerMap.put(MessageDestination.DCAE.toString(),producer); Mockito.when(producer.post(Matchers.anyString(), Matchers.anyString())).thenReturn(true); diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java index 64e7c5358..5b2183e37 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java @@ -30,7 +30,6 @@ import org.junit.Ignore; import org.junit.Test; import org.onap.appc.adapter.message.Producer; import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl; -import org.onap.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl; import org.onap.appc.configuration.Configuration; import org.onap.appc.configuration.ConfigurationFactory; @@ -46,7 +45,6 @@ import java.util.List; public class TestDmaapProducing { private static Producer httpProducer; - private static Producer dmaapProducer; @BeforeClass public static void setUp() { @@ -58,7 +56,6 @@ public class TestDmaapProducing { String user = configuration.getProperty("dmaap.appc.username"); String password = configuration.getProperty("dmaap.appc.password"); - dmaapProducer = new DmaapProducerImpl(hosts, topic,user,password); httpProducer = new HttpDmaapProducerImpl(hosts, topic); httpProducer.updateCredentials(user,password); } @@ -72,7 +69,7 @@ public class TestDmaapProducing { @Test @Ignore public void testPostMessages() { - testPostMessage(dmaapProducer); + testPostMessage(httpProducer); } private void testPostMessage(Producer producer) { diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImplTest.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImplTest.java index 76e56d85c..eaad4b844 100644 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImplTest.java +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImplTest.java @@ -37,6 +37,7 @@ import org.onap.appc.adapter.message.MessageDestination; import org.onap.appc.adapter.message.Producer; import org.onap.appc.adapter.message.event.EventHeader; import org.onap.appc.adapter.message.event.EventMessage; +import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl; import org.onap.appc.configuration.Configuration; import org.onap.appc.configuration.ConfigurationFactory; import org.onap.appc.exceptions.APPCException; @@ -76,7 +77,12 @@ public class EventSenderDmaapImplTest { EventHeader eventHeader = Mockito.mock(EventHeader.class); Mockito.when(eventHeader.getEventId()).thenReturn("EVENT_ID"); Mockito.when(eventMessage.getEventHeader()).thenReturn(eventHeader); - assertTrue(sender.sendEvent(MessageDestination.DCAE, eventMessage, "TOPIC NAME")); + Producer mockProducer = Mockito.mock(HttpDmaapProducerImpl.class); + Mockito.when(mockProducer.post(Mockito.anyString(), Mockito.anyString())).thenReturn(true); + Map<String,Producer> map = new HashMap<>(); + map.put(MessageDestination.DCAE.toString(),mockProducer); + sender.setProducerMap(map); + assertTrue(sender.sendEvent(MessageDestination.DCAE, eventMessage)); } @Test diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java deleted file mode 100644 index 9df2f070e..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ================================================================================ - * Modifications Copyright (C) 2019 Ericsson - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.impl; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.UUID; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.onap.appc.adapter.message.Consumer; -import org.onap.appc.adapter.message.Producer; -import org.onap.appc.configuration.Configuration; -import org.onap.appc.configuration.ConfigurationFactory; - - -public class TestConsumerProducerImpl { - - private Collection<String> urls; - private String topicRead; - private String topicWrite; - private String group; - private String groupId; - private String user; - private String password; - - @Before - public void setup() { - System.out.println("setup entry..."); - // urls = new HashSet<String>(); - // urls.add("dmaaphost1"); - // urls.add("dmaaphost2"); - // //remove unavailable dmaap instance for build - // //urls.add("dmaaphost3"); - // - // topicRead = "APPC-UNIT-TEST"; - // topicWrite = "APPC-UNIT-TEST"; - // group = "APPC-CLIENT"; - // groupId = "0"; - Configuration configuration = ConfigurationFactory.getConfiguration(); - List<String> hosts = Arrays.asList(configuration.getProperty("poolMembers").split(",")); - urls = new HashSet<String>(hosts); - topicRead = configuration.getProperty("topic.read"); - topicWrite = configuration.getProperty("topic.write"); - user = configuration.getProperty("dmaap.appc.username"); - password = configuration.getProperty("dmaap.appc.password"); - group = "APPC-CLIENT"; - groupId = "0"; - - - runoff(); - } - - /** - * Test that we can read and write and that the messages come back in order - */ - @Ignore - @Test - public void testWriteRead() { - System.out.println("testWriteRead entry..."); - Producer p = new DmaapProducerImpl(urls, topicWrite,user,password); - - String s1 = UUID.randomUUID().toString(); - String s2 = UUID.randomUUID().toString(); - if (p.post("TEST", s1) == false) { - // try again - 2nd attempt may succeed if cambria client failed over - p.post("TEST", s1); - } - if (p.post("TEST", s2) == false) { - // try again - 2nd attempt may succeed if cambria client failed over - p.post("TEST", s2); - } - - Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); - List<String> out = c.fetch(); - // if fetch is empty, try again - a 2nd attempt may succeed if - // cambria client has failed over - if ((out == null) || out.isEmpty()) { - out = c.fetch(); - } - - assertNotNull(out); - assertEquals(2, out.size()); - assertEquals(s1, out.get(0)); - assertEquals(s2, out.get(1)); - - } - - /** - * Test that we can read and write and that the messages come back in order - */ - @Test - @Ignore // Https Not support on jenkins server - public void testWriteReadHttps() { - System.out.println("testWriteReadHttps entry..."); - Producer p = new DmaapProducerImpl(urls, topicWrite,user,password); - p.useHttps(true); - - String s1 = UUID.randomUUID().toString(); - String s2 = UUID.randomUUID().toString(); - if (p.post("TEST", s1) == false) { - // try again - 2nd attempt may succeed if cambria client failed over - p.post("TEST", s1); - } - if (p.post("TEST", s2) == false) { - // try again - 2nd attempt may succeed if cambria client failed over - p.post("TEST", s2); - } - - Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); - c.useHttps(true); - - List<String> out = c.fetch(); - // if fetch is empty, try again - a 2nd attempt may succeed if - // cambria client has failed over - if ((out == null) || out.isEmpty()) { - out = c.fetch(); - } - - assertNotNull(out); - assertEquals(2, out.size()); - assertEquals(s1, out.get(0)); - assertEquals(s2, out.get(1)); - - } - - @Test - @Ignore // requires connection to a live DMaaP server - public void testBadUrl() { - System.out.println("testBadUrl entry..."); - urls.clear(); - urls.add("something.local"); - - // Producer p = new DmaapProducerImpl(urls, topicWrite); - Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); - List<String> result = c.fetch(1000, 1000); - - assertNotNull(result); - assertTrue(result.isEmpty()); - } - - @Test - @Ignore // requires connection to a live DMaaP server - public void testAuth() { - System.out.println("testAuth entry..."); - Producer p = new DmaapProducerImpl(urls, topicWrite,user,password); - Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); - - p.updateCredentials("key", "secret"); - c.updateCredentials("key", "secret"); - - // TODO - Do some protected dmaap queries when the apis are updated - } - - /** - * Test DMaaP client failover to another server when a bad url is encountered - - */ - @Ignore - @Test - public void testFailover() { - System.out.println("testFailover entry..."); - urls.clear(); - urls.add("openecomp2.org"); // bad url - urls.add("dmaaphost2"); - Producer p = new DmaapProducerImpl(urls, topicWrite,user,password); - - String s1 = UUID.randomUUID().toString(); - if (p.post("TEST", s1) == false) { - // try again - cambria client should have failed over - p.post("TEST", s1); - } - - urls.clear(); - urls.add("openecomp3.org"); // bad url - urls.add("dmaaphost3"); - - Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); - List<String> out = c.fetch(1000, 1000); - // if fetch is empty, try again - cambria client should have failed over - if ((out == null) || out.isEmpty()) { - out = c.fetch(); - } - - assertNotNull(out); - assertEquals(1, out.size()); - assertEquals(s1, out.get(0)); - } - - /** - * Reads through the entire topic so it is clean for testing. WARNING - ONLY USE ON TOPICS WHERE YOU ARE THE ONLY - * WRITER. Could end in an infinite loop otherwise. - */ - private void runoff() { - Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password); - List<String> data; - do { - data = c.fetch(1000, 10000); - } while (!data.isEmpty() && data.size()!=1); - } - - @Test - @Ignore - public void testFilter() { - System.out.println("testFilter entry..."); - List<String> res; - String filter = "{\"class\":\"Assigned\",\"field\":\"request\"}"; - Consumer c = new DmaapConsumerImpl(urls, "DCAE-CLOSED-LOOP-EVENTS-DEV1510SIM", group, groupId,user,password,filter); - res = c.fetch(2000, 10); - assertFalse(res.isEmpty()); - - res.clear(); - filter = "{\"class\":\"Assigned\",\"field\":\"response\"}"; - c = new DmaapConsumerImpl(urls, "DCAE-CLOSED-LOOP-EVENTS-DEV1510SIM", group, groupId,user,password, filter); - res = c.fetch(2000, 10); - assertTrue(res.isEmpty()); - } -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestDmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestDmaapConsumerImpl.java deleted file mode 100644 index 0d486361b..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestDmaapConsumerImpl.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Modifications Copyright (C) 2018 IBM - * ================================================================================ - * Modifications Copyright (C) 2019 Ericsson - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.impl; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.Properties; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mockito; -import org.onap.appc.configuration.Configuration; -import org.onap.appc.configuration.ConfigurationFactory; -import org.onap.appc.metricservice.MetricRegistry; -import org.onap.appc.metricservice.MetricService; -import org.onap.appc.metricservice.metric.DmaapRequestCounterBuilder; -import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric; -import org.onap.appc.metricservice.metric.MetricBuilderFactory; -import org.onap.appc.metricservice.metric.impl.MetricBuilderFactoryImpl; -import org.onap.appc.metricservice.policy.PolicyBuilderFactory; -import org.onap.appc.metricservice.policy.PublishingPolicy; -import org.onap.appc.metricservice.policy.ScheduledPolicyBuilder; -import org.osgi.framework.FrameworkUtil; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.reflect.Whitebox; -import com.att.nsa.mr.client.MRClientFactory; -import com.att.nsa.mr.client.MRConsumer; - -@RunWith(PowerMockRunner.class) -@PrepareForTest({ConfigurationFactory.class, FrameworkUtil.class, MRClientFactory.class}) -public class TestDmaapConsumerImpl { - String[] hostList = { "192.168.1.1" }; - Collection<String> hosts = new HashSet<String>(Arrays.asList(hostList)); - - String topic = "JunitTopicOne"; - String group = "junit-client"; - String id = "junit-consumer-one"; - String key = "key"; - String secret = "secret"; - String filter = null; - - @Test - public void testDmaapConsumerImplNoFilter() { - - DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, key, secret); - - assertNotNull(consumer); - - Properties props = consumer.getProperties(); - - assertEquals("192.168.1.1", props.getProperty("host")); - assertEquals("key", props.getProperty("username")); - assertEquals("secret", props.getProperty("password")); - } - - @Test - public void testDmaapConsumerImplwithFilter() { - - filter=""; - DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, key, secret, filter); - - assertNotNull(consumer); - - } - - @Test - public void testDmaapConsumerImplNoUserPassword() { - - DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, null, null); - - assertNotNull(consumer); - - Properties props = consumer.getProperties(); - - assertEquals("192.168.1.1", props.getProperty("host")); - assertNull(props.getProperty("username")); - assertNull(props.getProperty("password")); - assertEquals("HTTPNOAUTH", props.getProperty("TransportType")); - } - - @Test - public void testUpdateCredentials() { - DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, null, null); - - assertNotNull(consumer); - - Properties props = consumer.getProperties(); - - assertEquals("192.168.1.1", props.getProperty("host")); - assertNull(props.getProperty("username")); - assertNull(props.getProperty("password")); - - consumer.updateCredentials(key, secret); - - props = consumer.getProperties(); - assertEquals("192.168.1.1", props.getProperty("host")); - assertEquals("key", props.getProperty("username")); - assertEquals("secret", props.getProperty("password")); - } - - @Ignore("test is taking 130 sec") - @Test - public void testFetch() { - DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, key, secret); - - assertNotNull(consumer); - - consumer.fetch(5000,500); - } - - @Ignore - @Test - public void testFetchIntInt() { - fail("Not yet implemented"); - } - - @Test - public void testCloseNoClient() { - DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, key, secret); - - assertNotNull(consumer); - - consumer.close(); - } - - @Ignore - @Test - public void testCloseWithClient() { - fail("Not yet implemented"); - } - - @Test - public void testToString() { - DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, null, null); - - assertNotNull(consumer); - - assertEquals("Consumer junit-client/junit-consumer-one listening to JunitTopicOne on [192.168.1.1]", - consumer.toString()); - } - - @Test - public void testUseHttps() { - DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, key, secret); - - assertNotNull(consumer); - - assertEquals(false, consumer.isHttps()); - - consumer.useHttps(true); - - assertEquals(true, consumer.isHttps()); - - } - - @Test - public void testGetClient() throws FileNotFoundException, IOException - { - DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, key, secret); - assertNotNull(consumer); - PowerMockito.mockStatic(MRClientFactory.class); - PowerMockito.when(MRClientFactory.createConsumer(Mockito.anyString())).thenReturn(Mockito.mock(MRConsumer.class)); - assertTrue(consumer.getClient(1000,5) instanceof MRConsumer); - Properties props= consumer.getProperties(); - assertEquals("1000", props.getProperty("timeout")); - assertEquals("5", props.getProperty("limit")); - } - - @Test - public void testGetClientExceptionFlow() throws FileNotFoundException, IOException - { - DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, key, secret); - assertNotNull(consumer); - PowerMockito.mockStatic(MRClientFactory.class); - PowerMockito.when(MRClientFactory.createConsumer(Mockito.anyString())).thenThrow(new IOException()); - assertFalse(consumer.getClient(1000,5) instanceof MRConsumer); - Properties props= consumer.getProperties(); - assertEquals("1000", props.getProperty("timeout")); - assertEquals("5", props.getProperty("limit")); - } - - @Test - public void testInitMetric() throws FileNotFoundException, IOException - { - Configuration configuration = Mockito.mock(Configuration.class); - Properties properties = new Properties(); - properties.put("metric.enabled", "true"); - Mockito.when(configuration.getProperties()).thenReturn(properties); - PowerMockito.mockStatic(MRClientFactory.class); - PowerMockito.when(MRClientFactory.createConsumer(Mockito.anyString())).thenThrow(new IOException()); - DmaapConsumerImpl consumer = Mockito.spy(new DmaapConsumerImpl(hosts, topic, group, id, key, secret)); - Whitebox.setInternalState(consumer, "configuration", configuration); - MetricService metricService = Mockito.mock(MetricService.class); - MetricRegistry metricRegistry = Mockito.mock(MetricRegistry.class); - MetricBuilderFactory metricBuilderFactory = Mockito.spy(new MetricBuilderFactoryImpl()); - DmaapRequestCounterBuilder builder = Mockito.mock(DmaapRequestCounterBuilder.class); - DmaapRequestCounterMetric metric = Mockito.mock(DmaapRequestCounterMetric.class); - Mockito.when(builder.withName(Mockito.anyString())).thenReturn(builder); - Mockito.when(builder.withType(Mockito.any())).thenReturn(builder); - Mockito.when(builder.withPublishedMessage(Mockito.anyLong())).thenReturn(builder); - Mockito.when(builder.withRecievedMessage(Mockito.anyLong())).thenReturn(builder); - Mockito.when(builder.build()).thenReturn(metric); - Mockito.when(metricBuilderFactory.dmaapRequestCounterBuilder()).thenReturn(builder); - Mockito.when(metricRegistry.register(Mockito.any())).thenReturn(true); - PublishingPolicy policy = Mockito.mock(PublishingPolicy.class); - PolicyBuilderFactory policyFactory = Mockito.mock(PolicyBuilderFactory.class); - Mockito.when(metricRegistry.policyBuilderFactory()).thenReturn(policyFactory); - ScheduledPolicyBuilder policyBuilder = Mockito.mock(ScheduledPolicyBuilder.class); - Mockito.when(policyBuilder.withPublishers(Mockito.any())).thenReturn(policyBuilder); - Mockito.when(policyBuilder.withMetrics(Mockito.any())).thenReturn(policyBuilder); - Mockito.when(policyBuilder.build()).thenReturn(policy); - Mockito.when(policyFactory.scheduledPolicyBuilder()).thenReturn(policyBuilder); - Mockito.when(metricRegistry.metricBuilderFactory()).thenReturn(metricBuilderFactory); - Mockito.when(metricService.createRegistry("APPC")).thenReturn(metricRegistry); - Mockito.doReturn(metricService).when(consumer).getMetricService(); - consumer.fetch(1, 1); - Mockito.verify(policy).init(); - } - -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestDmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestDmaapProducerImpl.java deleted file mode 100644 index e6e665d68..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestDmaapProducerImpl.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ============================================================================= - * Modifications Copyright (C) 2018 IBM. - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.impl; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.Properties; -import java.util.Set; - -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -public class TestDmaapProducerImpl { - String[] hostList = { "192.168.1.1" }; - Collection<String> hosts = new HashSet<String>(Arrays.asList(hostList)); - - String topic = "JunitTopicOne"; - String group = "junit-client"; - String id = "junit-consumer-one"; - String key = "key"; - String secret = "secret"; - String filter = null; - - private DmaapProducerImpl producer; - - @Before - public void setUp() { - producer = new DmaapProducerImpl(hosts, topic, null, null); - } - - @Test - public void testDmaapProducerImplSingleTopic() { - producer = new DmaapProducerImpl(hosts, topic, key, secret); - - assertNotNull(producer); - - Properties props = producer.getProperties(); - - assertNotNull(props); - - assertEquals("key", props.getProperty("username")); - assertEquals("secret", props.getProperty("password")); - } - - @Test - public void testDmaapProducerImplMultipleTopic() { - String[] topicList = { "topic1", "topic2" }; - Set<String> topicNames = new HashSet<String>(Arrays.asList(topicList)); - - producer = new DmaapProducerImpl(hosts, topicNames, key, secret); - - assertNotNull(producer); - - Properties props = producer.getProperties(); - - assertNotNull(props); - - assertEquals("key", props.getProperty("username")); - assertEquals("secret", props.getProperty("password")); - - } - - @Test - public void testDmaapProducerImplNoUserPass() { - assertNotNull(producer); - - Properties props = producer.getProperties(); - - assertNotNull(props); - - assertNull(props.getProperty("username")); - assertNull(props.getProperty("password")); - } - - @Test - public void testUpdateCredentials() { - assertNotNull(producer); - - Properties props = producer.getProperties(); - - assertNotNull(props); - - assertNull(props.getProperty("username")); - assertNull(props.getProperty("password")); - - producer.updateCredentials(key, secret); - - props = producer.getProperties(); - - assertNotNull(props); - - assertEquals("key", props.getProperty("username")); - assertEquals("secret", props.getProperty("password")); - - } - - @Test - public void testPost() { - boolean successful = producer.post("partition", "data"); - assertEquals(true, successful); - } - - @Test - public void testCloseNoClient() { - producer = new DmaapProducerImpl(hosts, topic, key, secret); - - assertNotNull(producer); - - producer.close(); - } - - - @Test - public void testCloseWithClient() { - producer.post("partition", "data"); - assertNotNull(producer); - producer.close(); - } - - @Test - public void testUseHttps() { - producer = new DmaapProducerImpl(hosts, topic, key, secret); - - assertNotNull(producer); - - assertEquals(false, producer.isHttps()); - - producer.useHttps(true); - - assertEquals(true, producer.isHttps()); - - } - -}
\ No newline at end of file |