aboutsummaryrefslogtreecommitdiffstats
path: root/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src
diff options
context:
space:
mode:
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src')
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java5
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java244
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java228
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java5
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java136
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java5
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapEventSender.java4
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java5
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImplTest.java8
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java247
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestDmaapConsumerImpl.java253
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestDmaapProducerImpl.java161
12 files changed, 19 insertions, 1282 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
index a9af8c702..84b5a5ff6 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
@@ -50,6 +50,11 @@ public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer
//for test purposes
}
+ public HttpDmaapProducerImpl(Collection<String> urls, String topicName, String username, String password) {
+ this(urls, topicName);
+ updateCredentials(username, password);
+ }
+
public HttpDmaapProducerImpl(Collection<String> urls, String topicName) {
topics = new HashSet<>();
topics.add(topicName);
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java
deleted file mode 100644
index 30455201e..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * ================================================================================
- * Modifications Copyright (C) 2018 IBM
- * ================================================================================
- * Modifications Copyright (C) 2019 Ericsson
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.adapter.messaging.dmaap.impl;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.appc.adapter.message.Consumer;
-import org.onap.appc.adapter.messaging.dmaap.utils.DmaapUtil;
-import org.onap.appc.configuration.Configuration;
-import org.onap.appc.configuration.ConfigurationFactory;
-import org.onap.appc.metricservice.MetricRegistry;
-import org.onap.appc.metricservice.MetricService;
-import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric;
-import org.onap.appc.metricservice.metric.Metric;
-import org.onap.appc.metricservice.metric.MetricType;
-import org.onap.appc.metricservice.policy.PublishingPolicy;
-import org.onap.appc.metricservice.publisher.LogPublisher;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.FrameworkUtil;
-import org.osgi.framework.ServiceReference;
-
-public class DmaapConsumerImpl implements Consumer {
-
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
- private Configuration configuration = ConfigurationFactory.getConfiguration();
- // Default values
- private static final int DEFAULT_TIMEOUT_MS = 60000;
- private static final int DEFAULT_LIMIT = 1000;
- private String topic;
- private boolean isMetricEnabled = false;
- private boolean useHttps = false;
- private MetricRegistry metricRegistry;
- private MRConsumer client = null;
- private Properties props = null;
-
- public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,
- String user, String password) {
-
- this(urls, topicName, consumerGroupName, consumerId, user, password, null);
- }
-
- public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,
- String user, String password, String filter) {
-
- this.topic = topicName;
- this.props = new Properties();
- String urlsStr = StringUtils.join(urls, ',');
- props.setProperty("host", urlsStr);
- props.setProperty("group", consumerGroupName);
- props.setProperty("id", consumerId);
- if (user != null && password != null) {
- props.setProperty("username", user);
- props.setProperty("password", password);
- } else {
- props.setProperty("TransportType", "HTTPNOAUTH");
- }
-
- if (filter != null) {
- props.setProperty("filter", filter);
- }
- }
-
- private void initMetric() {
- LOG.debug("Metric getting initialized");
- MetricService metricService = getMetricService();
- if (metricService != null) {
- metricRegistry = metricService.createRegistry("APPC");
-
- DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory()
- .dmaapRequestCounterBuilder().withName("DMAAP_KPI").withType(MetricType.COUNTER)
- .withRecievedMessage(0).withPublishedMessage(0).build();
-
- if (metricRegistry.register(dmaapKpiMetric)) {
- Metric[] metrics = new Metric[] { dmaapKpiMetric };
- LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
- LogPublisher[] logPublishers = new LogPublisher[1];
- logPublishers[0] = logPublisher;
-
- PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory()
- .scheduledPolicyBuilder()
- .withPublishers(logPublishers)
- .withMetrics(metrics)
- .build();
-
- LOG.debug("Policy getting initialized");
- manuallyScheduledPublishingPolicy.init();
- LOG.debug("Metric initialized");
- }
- }
- }
-
- /**
- * @return An instance of MRConsumer created from our class variables.
- */
- synchronized MRConsumer getClient(int waitMs, int limit) {
- try {
- props.setProperty("timeout", String.valueOf(waitMs));
- props.setProperty("limit", String.valueOf(limit));
- String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic, props);
- return MRClientFactory.createConsumer(topicProducerPropFileName);
- } catch (IOException e1) {
- LOG.error("failed to createConsumer", e1);
- return null;
- }
- }
-
- @Override
- public synchronized void updateCredentials(String key, String secret) {
- LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
- props.setProperty("username", String.valueOf(key));
- props.setProperty("password", String.valueOf(secret));
- client = null;
- }
-
- @Override
- public List<String> fetch() {
- return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
- }
-
- @Override
- public List<String> fetch(int waitMs, int limit) {
- Properties properties = configuration.getProperties();
- if (properties != null && properties.getProperty("metric.enabled") != null) {
- isMetricEnabled = Boolean.valueOf(properties.getProperty("metric.enabled"));
- }
- if (isMetricEnabled) {
- initMetric();
- }
- LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
- List<String> out = new ArrayList<>();
-
- // Create client once and reuse it on subsequent fetches. This is
- // to support failover to other servers in the DMaaP cluster.
- if (client == null) {
- LOG.info("Getting DMaaP Client ...");
- client = getClient(waitMs, limit);
- }
- if (client != null) {
- try {
- for (String s : client.fetch(waitMs, limit)) {
- out.add(s);
- incrementReceivedMessage();
- }
- LOG.debug(String.format("Got %d records from %s", out.size(), this.toString()));
- } catch (Exception e) {
- // Connection exception
- LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage()), e);
- try {
- LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs));
- Thread.sleep(waitMs);
- } catch (InterruptedException e2) {
- LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs));
- Thread.currentThread().interrupt();
- }
- }
- }
- return out;
- }
-
- private void incrementReceivedMessage() {
- if (isMetricEnabled && metricRegistry != null) {
- ((DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage();
- }
- }
-
- /**
- * Close consumer Dmaap client.
- */
- @Override
- public void close() {
- LOG.debug("Closing Dmaap consumer client....");
- if (client != null) {
- client.close();
- }
- }
-
- @Override
- public String toString() {
- String hostStr = (props == null || props.getProperty("host") == null ? "N/A" : props.getProperty("host"));
- String group = (props == null || props.getProperty("group") == null ? "N/A" : props.getProperty("group"));
- String id = (props == null || props.getProperty("id") == null ? "N/A" : props.getProperty("id"));
- return String.format("Consumer %s/%s listening to %s on [%s]", group, id, topic, hostStr);
- }
-
- @Override
- public void useHttps(boolean yes) {
- useHttps = yes;
- }
-
- protected MetricService getMetricService() {
- BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext();
- ServiceReference sref = bctx.getServiceReference(MetricService.class.getName());
- if (sref != null) {
- LOG.info("Metric Service from bundlecontext");
- return (MetricService) bctx.getService(sref);
- } else {
- LOG.info("Metric Service error from bundlecontext");
- LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService");
- return null;
- }
- }
-
- public Properties getProperties() {
- return props;
- }
-
- public boolean isHttps() {
- return useHttps;
- }
-
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java
deleted file mode 100644
index efff18ed9..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Modifications Copyright (C) 2019 IBM
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.adapter.messaging.dmaap.impl;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.MRClientFactory;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.appc.adapter.message.Producer;
-import org.onap.appc.adapter.messaging.dmaap.utils.DmaapUtil;
-import org.onap.appc.configuration.Configuration;
-import org.onap.appc.configuration.ConfigurationFactory;
-import org.onap.appc.metricservice.MetricRegistry;
-import org.onap.appc.metricservice.MetricService;
-import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric;
-import org.onap.appc.metricservice.metric.Metric;
-import org.onap.appc.metricservice.metric.MetricType;
-import org.onap.appc.metricservice.policy.PublishingPolicy;
-import org.onap.appc.metricservice.publisher.LogPublisher;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.FrameworkUtil;
-import org.osgi.framework.ServiceReference;
-
-public class DmaapProducerImpl implements Producer {
-
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class);
- private static final Configuration configuration = ConfigurationFactory.getConfiguration();
-
- private Set<String> topics;
-
- private Properties props = null;
- private MetricRegistry metricRegistry;
- private boolean useHttps = false;
- private boolean isMetricEnabled = false;
-
- private Set<MRBatchingPublisher> clients;
-
- public DmaapProducerImpl(Collection<String> urls, String topicName, String user, String password) {
- this(urls, (Set<String>) null, user, password);
- this.topics = new HashSet<>();
- if (topicName != null) {
- Collections.addAll(topics, topicName.split(","));
- }
- }
-
- public DmaapProducerImpl(Collection<String> urls, Set<String> topicNames, String user, String password) {
- topics = topicNames;
- if (urls == null) {
- throw new IllegalArgumentException("Mandaory argument is null: urls");
- }
- this.props = new Properties();
- String urlsStr = StringUtils.join(urls, ',');
- props.setProperty("host", urlsStr);
- props.setProperty("id", UUID.randomUUID().toString());
- if (user != null && password != null) {
- props.setProperty("username", user);
- props.setProperty("password", password);
- } else {
- props.setProperty("TransportType", "HTTPNOAUTH");
- }
- }
-
- private void initMetric() {
- LOG.debug("Metric getting initialized");
- MetricService metricService = getMetricservice();
- if (metricService != null) {
- metricRegistry = metricService.createRegistry("APPC");
-
- DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory()
- .dmaapRequestCounterBuilder().withName("DMAAP_KPI").withType(MetricType.COUNTER)
- .withRecievedMessage(0).withPublishedMessage(0).build();
-
- if (metricRegistry.register(dmaapKpiMetric)) {
- Metric[] metrics = new Metric[] { dmaapKpiMetric };
- LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
- LogPublisher[] logPublishers = new LogPublisher[1];
- logPublishers[0] = logPublisher;
-
- PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory()
- .scheduledPolicyBuilder().withPublishers(logPublishers).withMetrics(metrics).build();
-
- LOG.debug("Policy getting initialized");
- manuallyScheduledPublishingPolicy.init();
- LOG.debug("Metric initialized");
- }
- }
- }
-
- private Set<MRBatchingPublisher> getClients() {
- Set<MRBatchingPublisher> out = new HashSet<>();
- for (String topic : topics) {
- try {
- String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic, props);
- final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher(topicProducerPropFileName);
- out.add(client);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
- return out;
- }
-
- @Override
- public synchronized void updateCredentials(String key, String secret) {
- LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
- props.setProperty("username", String.valueOf(key));
- props.setProperty("password", String.valueOf(secret));
- clients = null;
- }
-
- @Override
- public boolean post(String partition, String data) {
- LOG.debug("In DmaapProducerImpl.post()");
- boolean success = true;
- Properties properties = configuration.getProperties();
- if (properties != null && properties.getProperty("metric.enabled") != null) {
- isMetricEnabled = Boolean.valueOf(properties.getProperty("metric.enabled"));
- }
- if (isMetricEnabled) {
- initMetric();
- }
-
- // Create clients once and reuse them on subsequent posts. This is
- // to support failover to other servers in the Dmaap cluster.
- if ((clients == null) || (clients.isEmpty())) {
- LOG.info("Getting CambriaBatchingPublisher Clients ...");
- clients = getClients();
- }
- LOG.debug("In DmaapProducerImpl.post()::: before sending to clients");
- for (MRBatchingPublisher client : clients) {
- try {
- LOG.debug(String.format("Posting %s to %s", data, client));
- client.send(partition, data);
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- success = false;
- }
- }
- incrementPublishedMessage();
- return success;
- }
-
- private void incrementPublishedMessage() {
- if (isMetricEnabled && metricRegistry != null) {
- ((DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage();
- }
- }
-
- /**
- * Close producer Dmaap client.
- */
- @Override
- public void close() {
- if ((clients == null) || (clients.isEmpty())) {
- return;
- }
- LOG.debug("Closing Dmaap producer clients....");
- for (MRBatchingPublisher client : clients) {
- try {
- client.close(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client), e);
- Thread.currentThread().interrupt();
- } catch (IOException e) {
- LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client), e);
- }
- }
- }
-
- @Override
- public void useHttps(boolean yes) {
- useHttps = yes;
- }
-
- private MetricService getMetricservice() {
- BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext();
- ServiceReference sref = bctx.getServiceReference(MetricService.class.getName());
- if (sref != null) {
- LOG.info("Metric Service from bundlecontext");
- return (MetricService) bctx.getService(sref);
- } else {
- LOG.info("Metric Service error from bundlecontext");
- LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService");
- return null;
- }
- }
-
- public Properties getProperties() {
- return props;
- }
-
- public boolean isHttps() {
- return useHttps;
- }
-
-} \ No newline at end of file
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java
index 7d8bc763a..bd4c7f08d 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java
@@ -34,6 +34,7 @@ import org.onap.appc.adapter.message.Producer;
import org.onap.appc.adapter.message.event.EventHeader;
import org.onap.appc.adapter.message.event.EventMessage;
import org.onap.appc.adapter.message.event.EventStatus;
+import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl;
import org.onap.appc.configuration.Configuration;
import org.onap.appc.configuration.ConfigurationFactory;
import org.onap.appc.exceptions.APPCException;
@@ -91,7 +92,7 @@ public class EventSenderDmaapImpl implements EventSender
LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS));
LOG.debug(String.format("writeTopic = %s, taken from property: %s", writeTopic, destination + "." + EVENT_TOPIC_WRITE));
LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME));
- Producer producer = new DmaapProducerImpl(pool, writeTopic,username, password);
+ Producer producer = new HttpDmaapProducerImpl(pool, writeTopic,username, password);
for (String url : pool) {
if (url.contains("3905") || url.contains("https")) {
@@ -138,7 +139,7 @@ public class EventSenderDmaapImpl implements EventSender
LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS));
LOG.debug(String.format("writeTopic = %s, taken from property: %s", eventTopicName, destination + "." + EVENT_TOPIC_WRITE));
LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME));
- Producer producer = new DmaapProducerImpl(pool, eventTopicName, username, password);
+ Producer producer = new HttpDmaapProducerImpl(pool, eventTopicName, username, password);
for (String url : pool) {
if (url.contains("3905") || url.contains("https")) {
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java
deleted file mode 100644
index 3d50ea588..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/SimpleExamplePublisher.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.adapter.messaging.dmaap;
-
-import java.io.*;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import com.att.nsa.mr.client.MRConsumer;
-import org.json.JSONObject;
-import org.onap.appc.adapter.messaging.dmaap.utils.DmaapUtil;
-
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRPublisher.message;
-
-
-/**
- *An example of how to use the Java publisher.
- */
-public class SimpleExamplePublisher
-{
-
- public static void main(String []args) throws InterruptedException, Exception{
- int msgCount = 1;
- SimpleExamplePublisher publisher = new SimpleExamplePublisher();
-
- int i=0;
-
- String topicProducerPropFileName = DmaapUtil.createProducerPropFile("org.onap.appc.UNIT-TEST", null);
- while (i< msgCount)
- {
- publisher.publishMessage(topicProducerPropFileName,i);
- i++;
- }
-
- fetchMessage();
- }
-
-
- public void publishMessage( String producerFilePath,int count ) throws IOException, InterruptedException, Exception
- {
- // create our publisher
- final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath);
- // publish some messages
- final JSONObject msg1 = new JSONObject ();
- msg1.put ( "Partition:2", "Message:" +count);
- //msg1.put ( "greeting", "Hello .." );
-
- pub.send ( "2", msg1.toString());
- // close the publisher to make sure everything's sent before exiting. The batching
- // publisher interface allows the app to get the set of unsent messages. It could
- // write them to disk, for example, to try to send them later.
- final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS );
- if ( stuck.size () > 0 )
- {
- System.err.println ( stuck.size() + " messages unsent" );
- }
- else
- {
- System.out.println ( "Clean exit; all messages sent." );
- }
- }
-
-
- public static void fetchMessage()
- {
- int count = 0;
-
- try
- {
- String topic = "org.onap.appc.UNIT-TEST";
- Properties props = new Properties();
- props.put("id", "1");
- props.put("group", "group1");
- String topicConsumerPropFileName1 = DmaapUtil.createConsumerPropFile(topic,props);
- final MRConsumer consumer1 = MRClientFactory.createConsumer ( topicConsumerPropFileName1);
-
- props = new Properties();
- props.put("id", "2");
- props.put("group", "group2");
- String topicConsumerPropFileName2 = DmaapUtil.createConsumerPropFile(topic,props);
- final MRConsumer consumer2 = MRClientFactory.createConsumer ( topicConsumerPropFileName2);
-
- for ( String msg : consumer1.fetch () )
- {
- count++;
- System.out.println ( "consumer1 "+count + ": " + msg );
- }
- for ( String msg : consumer2.fetch () )
- {
- count++;
- System.out.println ( "consumer1 "+count + ": " + msg );
- }
-
-
- }
- catch ( Exception x )
- {
- System.out.println("inside cons exc");
- System.err.println ( x.getClass().getName () + ": " + x.getMessage () );
- }
- }
-}
-
-
-
-
-
-
-
-
-
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java
index b669dcdca..ec9740053 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java
@@ -29,7 +29,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.onap.appc.adapter.message.Consumer;
import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapConsumerImpl;
-import org.onap.appc.adapter.messaging.dmaap.impl.DmaapConsumerImpl;
import org.onap.appc.configuration.Configuration;
import org.onap.appc.configuration.ConfigurationFactory;
import org.junit.Ignore;
@@ -45,7 +44,6 @@ import java.util.List;
*/
public class TestDmaapConsuming {
- private static Consumer dmaapConsumer;
private static Consumer httpConsumer;
@BeforeClass
@@ -62,7 +60,6 @@ public class TestDmaapConsuming {
String password = configuration.getProperty("dmaap.appc.password");
httpConsumer = new HttpDmaapConsumerImpl(hosts, topic, consumerName, consumerId, msgFilter);
- dmaapConsumer = new DmaapConsumerImpl(hosts, topic, consumerName, consumerId,user,password,msgFilter);
}
@Test
@@ -74,7 +71,7 @@ public class TestDmaapConsuming {
@Test
@Ignore
public void testFetchMessages() {
- testFetchMessages(dmaapConsumer);
+ testFetchMessages(httpConsumer);
}
private void testFetchMessages(Consumer consumer) {
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapEventSender.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapEventSender.java
index efdc2beda..c6bf6671d 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapEventSender.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapEventSender.java
@@ -35,7 +35,7 @@ import org.onap.appc.adapter.message.Producer;
import org.onap.appc.adapter.message.event.EventHeader;
import org.onap.appc.adapter.message.event.EventMessage;
import org.onap.appc.adapter.message.event.EventStatus;
-import org.onap.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl;
+import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl;
import org.onap.appc.adapter.messaging.dmaap.impl.EventSenderDmaapImpl;
import org.onap.appc.configuration.Configuration;
import org.onap.appc.configuration.ConfigurationFactory;
@@ -74,7 +74,7 @@ public class TestDmaapEventSender {
props.setProperty(EventSenderDmaapImpl.DMAAP_PASSWORD, eventClientSecret);
}
- Producer producer = Mockito.mock(DmaapProducerImpl.class);
+ Producer producer = Mockito.mock(HttpDmaapProducerImpl.class);
producerMap.put(MessageDestination.DCAE.toString(),producer);
Mockito.when(producer.post(Matchers.anyString(), Matchers.anyString())).thenReturn(true);
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java
index 64e7c5358..5b2183e37 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java
@@ -30,7 +30,6 @@ import org.junit.Ignore;
import org.junit.Test;
import org.onap.appc.adapter.message.Producer;
import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl;
-import org.onap.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl;
import org.onap.appc.configuration.Configuration;
import org.onap.appc.configuration.ConfigurationFactory;
@@ -46,7 +45,6 @@ import java.util.List;
public class TestDmaapProducing {
private static Producer httpProducer;
- private static Producer dmaapProducer;
@BeforeClass
public static void setUp() {
@@ -58,7 +56,6 @@ public class TestDmaapProducing {
String user = configuration.getProperty("dmaap.appc.username");
String password = configuration.getProperty("dmaap.appc.password");
- dmaapProducer = new DmaapProducerImpl(hosts, topic,user,password);
httpProducer = new HttpDmaapProducerImpl(hosts, topic);
httpProducer.updateCredentials(user,password);
}
@@ -72,7 +69,7 @@ public class TestDmaapProducing {
@Test
@Ignore
public void testPostMessages() {
- testPostMessage(dmaapProducer);
+ testPostMessage(httpProducer);
}
private void testPostMessage(Producer producer) {
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImplTest.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImplTest.java
index 76e56d85c..eaad4b844 100644
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImplTest.java
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImplTest.java
@@ -37,6 +37,7 @@ import org.onap.appc.adapter.message.MessageDestination;
import org.onap.appc.adapter.message.Producer;
import org.onap.appc.adapter.message.event.EventHeader;
import org.onap.appc.adapter.message.event.EventMessage;
+import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl;
import org.onap.appc.configuration.Configuration;
import org.onap.appc.configuration.ConfigurationFactory;
import org.onap.appc.exceptions.APPCException;
@@ -76,7 +77,12 @@ public class EventSenderDmaapImplTest {
EventHeader eventHeader = Mockito.mock(EventHeader.class);
Mockito.when(eventHeader.getEventId()).thenReturn("EVENT_ID");
Mockito.when(eventMessage.getEventHeader()).thenReturn(eventHeader);
- assertTrue(sender.sendEvent(MessageDestination.DCAE, eventMessage, "TOPIC NAME"));
+ Producer mockProducer = Mockito.mock(HttpDmaapProducerImpl.class);
+ Mockito.when(mockProducer.post(Mockito.anyString(), Mockito.anyString())).thenReturn(true);
+ Map<String,Producer> map = new HashMap<>();
+ map.put(MessageDestination.DCAE.toString(),mockProducer);
+ sender.setProducerMap(map);
+ assertTrue(sender.sendEvent(MessageDestination.DCAE, eventMessage));
}
@Test
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java
deleted file mode 100644
index 9df2f070e..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestConsumerProducerImpl.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * ================================================================================
- * Modifications Copyright (C) 2019 Ericsson
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.adapter.messaging.dmaap.impl;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.UUID;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.onap.appc.adapter.message.Consumer;
-import org.onap.appc.adapter.message.Producer;
-import org.onap.appc.configuration.Configuration;
-import org.onap.appc.configuration.ConfigurationFactory;
-
-
-public class TestConsumerProducerImpl {
-
- private Collection<String> urls;
- private String topicRead;
- private String topicWrite;
- private String group;
- private String groupId;
- private String user;
- private String password;
-
- @Before
- public void setup() {
- System.out.println("setup entry...");
- // urls = new HashSet<String>();
- // urls.add("dmaaphost1");
- // urls.add("dmaaphost2");
- // //remove unavailable dmaap instance for build
- // //urls.add("dmaaphost3");
- //
- // topicRead = "APPC-UNIT-TEST";
- // topicWrite = "APPC-UNIT-TEST";
- // group = "APPC-CLIENT";
- // groupId = "0";
- Configuration configuration = ConfigurationFactory.getConfiguration();
- List<String> hosts = Arrays.asList(configuration.getProperty("poolMembers").split(","));
- urls = new HashSet<String>(hosts);
- topicRead = configuration.getProperty("topic.read");
- topicWrite = configuration.getProperty("topic.write");
- user = configuration.getProperty("dmaap.appc.username");
- password = configuration.getProperty("dmaap.appc.password");
- group = "APPC-CLIENT";
- groupId = "0";
-
-
- runoff();
- }
-
- /**
- * Test that we can read and write and that the messages come back in order
- */
- @Ignore
- @Test
- public void testWriteRead() {
- System.out.println("testWriteRead entry...");
- Producer p = new DmaapProducerImpl(urls, topicWrite,user,password);
-
- String s1 = UUID.randomUUID().toString();
- String s2 = UUID.randomUUID().toString();
- if (p.post("TEST", s1) == false) {
- // try again - 2nd attempt may succeed if cambria client failed over
- p.post("TEST", s1);
- }
- if (p.post("TEST", s2) == false) {
- // try again - 2nd attempt may succeed if cambria client failed over
- p.post("TEST", s2);
- }
-
- Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password);
- List<String> out = c.fetch();
- // if fetch is empty, try again - a 2nd attempt may succeed if
- // cambria client has failed over
- if ((out == null) || out.isEmpty()) {
- out = c.fetch();
- }
-
- assertNotNull(out);
- assertEquals(2, out.size());
- assertEquals(s1, out.get(0));
- assertEquals(s2, out.get(1));
-
- }
-
- /**
- * Test that we can read and write and that the messages come back in order
- */
- @Test
- @Ignore // Https Not support on jenkins server
- public void testWriteReadHttps() {
- System.out.println("testWriteReadHttps entry...");
- Producer p = new DmaapProducerImpl(urls, topicWrite,user,password);
- p.useHttps(true);
-
- String s1 = UUID.randomUUID().toString();
- String s2 = UUID.randomUUID().toString();
- if (p.post("TEST", s1) == false) {
- // try again - 2nd attempt may succeed if cambria client failed over
- p.post("TEST", s1);
- }
- if (p.post("TEST", s2) == false) {
- // try again - 2nd attempt may succeed if cambria client failed over
- p.post("TEST", s2);
- }
-
- Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password);
- c.useHttps(true);
-
- List<String> out = c.fetch();
- // if fetch is empty, try again - a 2nd attempt may succeed if
- // cambria client has failed over
- if ((out == null) || out.isEmpty()) {
- out = c.fetch();
- }
-
- assertNotNull(out);
- assertEquals(2, out.size());
- assertEquals(s1, out.get(0));
- assertEquals(s2, out.get(1));
-
- }
-
- @Test
- @Ignore // requires connection to a live DMaaP server
- public void testBadUrl() {
- System.out.println("testBadUrl entry...");
- urls.clear();
- urls.add("something.local");
-
- // Producer p = new DmaapProducerImpl(urls, topicWrite);
- Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password);
- List<String> result = c.fetch(1000, 1000);
-
- assertNotNull(result);
- assertTrue(result.isEmpty());
- }
-
- @Test
- @Ignore // requires connection to a live DMaaP server
- public void testAuth() {
- System.out.println("testAuth entry...");
- Producer p = new DmaapProducerImpl(urls, topicWrite,user,password);
- Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password);
-
- p.updateCredentials("key", "secret");
- c.updateCredentials("key", "secret");
-
- // TODO - Do some protected dmaap queries when the apis are updated
- }
-
- /**
- * Test DMaaP client failover to another server when a bad url is encountered
-
- */
- @Ignore
- @Test
- public void testFailover() {
- System.out.println("testFailover entry...");
- urls.clear();
- urls.add("openecomp2.org"); // bad url
- urls.add("dmaaphost2");
- Producer p = new DmaapProducerImpl(urls, topicWrite,user,password);
-
- String s1 = UUID.randomUUID().toString();
- if (p.post("TEST", s1) == false) {
- // try again - cambria client should have failed over
- p.post("TEST", s1);
- }
-
- urls.clear();
- urls.add("openecomp3.org"); // bad url
- urls.add("dmaaphost3");
-
- Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password);
- List<String> out = c.fetch(1000, 1000);
- // if fetch is empty, try again - cambria client should have failed over
- if ((out == null) || out.isEmpty()) {
- out = c.fetch();
- }
-
- assertNotNull(out);
- assertEquals(1, out.size());
- assertEquals(s1, out.get(0));
- }
-
- /**
- * Reads through the entire topic so it is clean for testing. WARNING - ONLY USE ON TOPICS WHERE YOU ARE THE ONLY
- * WRITER. Could end in an infinite loop otherwise.
- */
- private void runoff() {
- Consumer c = new DmaapConsumerImpl(urls, topicRead, group, groupId,user,password);
- List<String> data;
- do {
- data = c.fetch(1000, 10000);
- } while (!data.isEmpty() && data.size()!=1);
- }
-
- @Test
- @Ignore
- public void testFilter() {
- System.out.println("testFilter entry...");
- List<String> res;
- String filter = "{\"class\":\"Assigned\",\"field\":\"request\"}";
- Consumer c = new DmaapConsumerImpl(urls, "DCAE-CLOSED-LOOP-EVENTS-DEV1510SIM", group, groupId,user,password,filter);
- res = c.fetch(2000, 10);
- assertFalse(res.isEmpty());
-
- res.clear();
- filter = "{\"class\":\"Assigned\",\"field\":\"response\"}";
- c = new DmaapConsumerImpl(urls, "DCAE-CLOSED-LOOP-EVENTS-DEV1510SIM", group, groupId,user,password, filter);
- res = c.fetch(2000, 10);
- assertTrue(res.isEmpty());
- }
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestDmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestDmaapConsumerImpl.java
deleted file mode 100644
index 0d486361b..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestDmaapConsumerImpl.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Modifications Copyright (C) 2018 IBM
- * ================================================================================
- * Modifications Copyright (C) 2019 Ericsson
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.adapter.messaging.dmaap.impl;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Properties;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.onap.appc.configuration.Configuration;
-import org.onap.appc.configuration.ConfigurationFactory;
-import org.onap.appc.metricservice.MetricRegistry;
-import org.onap.appc.metricservice.MetricService;
-import org.onap.appc.metricservice.metric.DmaapRequestCounterBuilder;
-import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric;
-import org.onap.appc.metricservice.metric.MetricBuilderFactory;
-import org.onap.appc.metricservice.metric.impl.MetricBuilderFactoryImpl;
-import org.onap.appc.metricservice.policy.PolicyBuilderFactory;
-import org.onap.appc.metricservice.policy.PublishingPolicy;
-import org.onap.appc.metricservice.policy.ScheduledPolicyBuilder;
-import org.osgi.framework.FrameworkUtil;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ConfigurationFactory.class, FrameworkUtil.class, MRClientFactory.class})
-public class TestDmaapConsumerImpl {
- String[] hostList = { "192.168.1.1" };
- Collection<String> hosts = new HashSet<String>(Arrays.asList(hostList));
-
- String topic = "JunitTopicOne";
- String group = "junit-client";
- String id = "junit-consumer-one";
- String key = "key";
- String secret = "secret";
- String filter = null;
-
- @Test
- public void testDmaapConsumerImplNoFilter() {
-
- DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, key, secret);
-
- assertNotNull(consumer);
-
- Properties props = consumer.getProperties();
-
- assertEquals("192.168.1.1", props.getProperty("host"));
- assertEquals("key", props.getProperty("username"));
- assertEquals("secret", props.getProperty("password"));
- }
-
- @Test
- public void testDmaapConsumerImplwithFilter() {
-
- filter="";
- DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, key, secret, filter);
-
- assertNotNull(consumer);
-
- }
-
- @Test
- public void testDmaapConsumerImplNoUserPassword() {
-
- DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, null, null);
-
- assertNotNull(consumer);
-
- Properties props = consumer.getProperties();
-
- assertEquals("192.168.1.1", props.getProperty("host"));
- assertNull(props.getProperty("username"));
- assertNull(props.getProperty("password"));
- assertEquals("HTTPNOAUTH", props.getProperty("TransportType"));
- }
-
- @Test
- public void testUpdateCredentials() {
- DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, null, null);
-
- assertNotNull(consumer);
-
- Properties props = consumer.getProperties();
-
- assertEquals("192.168.1.1", props.getProperty("host"));
- assertNull(props.getProperty("username"));
- assertNull(props.getProperty("password"));
-
- consumer.updateCredentials(key, secret);
-
- props = consumer.getProperties();
- assertEquals("192.168.1.1", props.getProperty("host"));
- assertEquals("key", props.getProperty("username"));
- assertEquals("secret", props.getProperty("password"));
- }
-
- @Ignore("test is taking 130 sec")
- @Test
- public void testFetch() {
- DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, key, secret);
-
- assertNotNull(consumer);
-
- consumer.fetch(5000,500);
- }
-
- @Ignore
- @Test
- public void testFetchIntInt() {
- fail("Not yet implemented");
- }
-
- @Test
- public void testCloseNoClient() {
- DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, key, secret);
-
- assertNotNull(consumer);
-
- consumer.close();
- }
-
- @Ignore
- @Test
- public void testCloseWithClient() {
- fail("Not yet implemented");
- }
-
- @Test
- public void testToString() {
- DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, null, null);
-
- assertNotNull(consumer);
-
- assertEquals("Consumer junit-client/junit-consumer-one listening to JunitTopicOne on [192.168.1.1]",
- consumer.toString());
- }
-
- @Test
- public void testUseHttps() {
- DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, key, secret);
-
- assertNotNull(consumer);
-
- assertEquals(false, consumer.isHttps());
-
- consumer.useHttps(true);
-
- assertEquals(true, consumer.isHttps());
-
- }
-
- @Test
- public void testGetClient() throws FileNotFoundException, IOException
- {
- DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, key, secret);
- assertNotNull(consumer);
- PowerMockito.mockStatic(MRClientFactory.class);
- PowerMockito.when(MRClientFactory.createConsumer(Mockito.anyString())).thenReturn(Mockito.mock(MRConsumer.class));
- assertTrue(consumer.getClient(1000,5) instanceof MRConsumer);
- Properties props= consumer.getProperties();
- assertEquals("1000", props.getProperty("timeout"));
- assertEquals("5", props.getProperty("limit"));
- }
-
- @Test
- public void testGetClientExceptionFlow() throws FileNotFoundException, IOException
- {
- DmaapConsumerImpl consumer = new DmaapConsumerImpl(hosts, topic, group, id, key, secret);
- assertNotNull(consumer);
- PowerMockito.mockStatic(MRClientFactory.class);
- PowerMockito.when(MRClientFactory.createConsumer(Mockito.anyString())).thenThrow(new IOException());
- assertFalse(consumer.getClient(1000,5) instanceof MRConsumer);
- Properties props= consumer.getProperties();
- assertEquals("1000", props.getProperty("timeout"));
- assertEquals("5", props.getProperty("limit"));
- }
-
- @Test
- public void testInitMetric() throws FileNotFoundException, IOException
- {
- Configuration configuration = Mockito.mock(Configuration.class);
- Properties properties = new Properties();
- properties.put("metric.enabled", "true");
- Mockito.when(configuration.getProperties()).thenReturn(properties);
- PowerMockito.mockStatic(MRClientFactory.class);
- PowerMockito.when(MRClientFactory.createConsumer(Mockito.anyString())).thenThrow(new IOException());
- DmaapConsumerImpl consumer = Mockito.spy(new DmaapConsumerImpl(hosts, topic, group, id, key, secret));
- Whitebox.setInternalState(consumer, "configuration", configuration);
- MetricService metricService = Mockito.mock(MetricService.class);
- MetricRegistry metricRegistry = Mockito.mock(MetricRegistry.class);
- MetricBuilderFactory metricBuilderFactory = Mockito.spy(new MetricBuilderFactoryImpl());
- DmaapRequestCounterBuilder builder = Mockito.mock(DmaapRequestCounterBuilder.class);
- DmaapRequestCounterMetric metric = Mockito.mock(DmaapRequestCounterMetric.class);
- Mockito.when(builder.withName(Mockito.anyString())).thenReturn(builder);
- Mockito.when(builder.withType(Mockito.any())).thenReturn(builder);
- Mockito.when(builder.withPublishedMessage(Mockito.anyLong())).thenReturn(builder);
- Mockito.when(builder.withRecievedMessage(Mockito.anyLong())).thenReturn(builder);
- Mockito.when(builder.build()).thenReturn(metric);
- Mockito.when(metricBuilderFactory.dmaapRequestCounterBuilder()).thenReturn(builder);
- Mockito.when(metricRegistry.register(Mockito.any())).thenReturn(true);
- PublishingPolicy policy = Mockito.mock(PublishingPolicy.class);
- PolicyBuilderFactory policyFactory = Mockito.mock(PolicyBuilderFactory.class);
- Mockito.when(metricRegistry.policyBuilderFactory()).thenReturn(policyFactory);
- ScheduledPolicyBuilder policyBuilder = Mockito.mock(ScheduledPolicyBuilder.class);
- Mockito.when(policyBuilder.withPublishers(Mockito.any())).thenReturn(policyBuilder);
- Mockito.when(policyBuilder.withMetrics(Mockito.any())).thenReturn(policyBuilder);
- Mockito.when(policyBuilder.build()).thenReturn(policy);
- Mockito.when(policyFactory.scheduledPolicyBuilder()).thenReturn(policyBuilder);
- Mockito.when(metricRegistry.metricBuilderFactory()).thenReturn(metricBuilderFactory);
- Mockito.when(metricService.createRegistry("APPC")).thenReturn(metricRegistry);
- Mockito.doReturn(metricService).when(consumer).getMetricService();
- consumer.fetch(1, 1);
- Mockito.verify(policy).init();
- }
-
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestDmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestDmaapProducerImpl.java
deleted file mode 100644
index e6e665d68..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/impl/TestDmaapProducerImpl.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * =============================================================================
- * Modifications Copyright (C) 2018 IBM.
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.adapter.messaging.dmaap.impl;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class TestDmaapProducerImpl {
- String[] hostList = { "192.168.1.1" };
- Collection<String> hosts = new HashSet<String>(Arrays.asList(hostList));
-
- String topic = "JunitTopicOne";
- String group = "junit-client";
- String id = "junit-consumer-one";
- String key = "key";
- String secret = "secret";
- String filter = null;
-
- private DmaapProducerImpl producer;
-
- @Before
- public void setUp() {
- producer = new DmaapProducerImpl(hosts, topic, null, null);
- }
-
- @Test
- public void testDmaapProducerImplSingleTopic() {
- producer = new DmaapProducerImpl(hosts, topic, key, secret);
-
- assertNotNull(producer);
-
- Properties props = producer.getProperties();
-
- assertNotNull(props);
-
- assertEquals("key", props.getProperty("username"));
- assertEquals("secret", props.getProperty("password"));
- }
-
- @Test
- public void testDmaapProducerImplMultipleTopic() {
- String[] topicList = { "topic1", "topic2" };
- Set<String> topicNames = new HashSet<String>(Arrays.asList(topicList));
-
- producer = new DmaapProducerImpl(hosts, topicNames, key, secret);
-
- assertNotNull(producer);
-
- Properties props = producer.getProperties();
-
- assertNotNull(props);
-
- assertEquals("key", props.getProperty("username"));
- assertEquals("secret", props.getProperty("password"));
-
- }
-
- @Test
- public void testDmaapProducerImplNoUserPass() {
- assertNotNull(producer);
-
- Properties props = producer.getProperties();
-
- assertNotNull(props);
-
- assertNull(props.getProperty("username"));
- assertNull(props.getProperty("password"));
- }
-
- @Test
- public void testUpdateCredentials() {
- assertNotNull(producer);
-
- Properties props = producer.getProperties();
-
- assertNotNull(props);
-
- assertNull(props.getProperty("username"));
- assertNull(props.getProperty("password"));
-
- producer.updateCredentials(key, secret);
-
- props = producer.getProperties();
-
- assertNotNull(props);
-
- assertEquals("key", props.getProperty("username"));
- assertEquals("secret", props.getProperty("password"));
-
- }
-
- @Test
- public void testPost() {
- boolean successful = producer.post("partition", "data");
- assertEquals(true, successful);
- }
-
- @Test
- public void testCloseNoClient() {
- producer = new DmaapProducerImpl(hosts, topic, key, secret);
-
- assertNotNull(producer);
-
- producer.close();
- }
-
-
- @Test
- public void testCloseWithClient() {
- producer.post("partition", "data");
- assertNotNull(producer);
- producer.close();
- }
-
- @Test
- public void testUseHttps() {
- producer = new DmaapProducerImpl(hosts, topic, key, secret);
-
- assertNotNull(producer);
-
- assertEquals(false, producer.isHttps());
-
- producer.useHttps(true);
-
- assertEquals(true, producer.isHttps());
-
- }
-
-} \ No newline at end of file