summaryrefslogtreecommitdiffstats
path: root/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src
diff options
context:
space:
mode:
Diffstat (limited to 'services/appc-dmaap-service/appc-dmaap-adapter-bundle/src')
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java104
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java28
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java105
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java154
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java140
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/utils/DmaapUtil.java155
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties56
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/org/onap/appc/default.properties27
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt22
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/producer.properties54
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestAppcDmaapAdapterActivator.java46
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java83
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java79
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestAuthenticationException.java35
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestCommonHttpClient.java118
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapConsumerImpl.java153
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapProducerImpl.java135
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/utils/TestDmaapUtil.java268
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/DCAEResponse.txt20
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/consumer.properties54
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/default.properties38
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/producer.properties52
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/test.properties37
23 files changed, 1963 insertions, 0 deletions
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java
new file mode 100644
index 000000000..828cdb773
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java
@@ -0,0 +1,104 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+
+
+
+/**
+ * This activator is used to initialize and terminate the connection pool to one or more providers.
+ * <p>
+ * The CDP abstraction layer supports multiple types of providers, with each provider supporting multiple tenants. The
+ * "connection" to a specific tenant on a specific provider is represented by a "context" object. These context objects
+ * are authenticated to a specific tenant on the provider, but can be reused from one request to another. Contexts are
+ * slow to set up and are resource intensive, so they are cached. However, the contexts for a specific tenant on a
+ * specific provider must be cached separately.
+ * </p>
+ * <p>
+ * Activation of the bundle creates an empty cache which is organized first by provider type, then by tenant name, with
+ * the contents being an empty pool of contexts for that provider/tenant combination. The pool is created on first use,
+ * and retained for as long as the bundle is active.
+ * </p>
+ * <p>
+ * When the bundle is deactivated, the cache is torn down with all contexts being closed.
+ * </p>
+ */
+public class AppcDmaapAdapterActivator implements BundleActivator {
+
+ /**
+ * The logger to be used
+ */
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(AppcDmaapAdapterActivator.class);
+
+ /**
+ * Called when this bundle is started so the Framework can perform the bundle-specific activities necessary to start
+ * this bundle. This method can be used to register services or to allocate any resources that this bundle needs.
+ * <p>
+ * This method must complete and return to its caller in a timely manner.
+ * </p>
+ *
+ * @param bundleContext
+ * The execution context of the bundle being started.
+ * @throws java.lang.Exception
+ * If this method throws an exception, this bundle is marked as stopped and the Framework will remove
+ * this bundle's listeners, unregister all services registered by this bundle, and release all services
+ * used by this bundle.
+ * @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext)
+ */
+ @Override
+ public void start(final BundleContext bundleContext) throws Exception {
+ LOG.info("Starting Bundle " + getName());
+ }
+
+ /**
+ * Called when this bundle is stopped so the Framework can perform the bundle-specific activities necessary to stop
+ * the bundle. In general, this method should undo the work that the BundleActivator.start method started. There
+ * should be no active threads that were started by this bundle when this bundle returns. A stopped bundle must not
+ * call any Framework objects.
+ * <p>
+ * This method must complete and return to its caller in a timely manner.
+ * </p>
+ *
+ * @param ctx
+ * The execution context of the bundle being stopped.
+ * @throws java.lang.Exception
+ * If this method throws an exception, the bundle is still marked as stopped, and the Framework will
+ * remove the bundle's listeners, unregister all services registered by the bundle, and release all
+ * services used by the bundle. *
+ * @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext)
+ */
+ @Override
+ public void stop(BundleContext ctx) throws Exception {
+ LOG.info("Stopped Bundle " + getName());
+ }
+
+ public String getName() {
+ return "DMaaP Adapter";
+ }
+
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java
new file mode 100644
index 000000000..f670d6cdd
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java
@@ -0,0 +1,28 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+class AuthenticationException extends Exception {
+
+ public AuthenticationException(String message) {
+ super(message);
+ }
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
new file mode 100644
index 000000000..7c243d08e
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
@@ -0,0 +1,105 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import java.net.URI;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.config.RequestConfig.Builder;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+
+abstract class CommonHttpClient {
+
+ private static final int HTTP_PORT = 3904;
+ private static final int HTTPS_PORT = 3905;
+ private static final int TIMEOUT_OFFSET = 5000;
+
+ private String authStr;
+
+ protected void setBasicAuth(String username, String password) {
+ if (username != null && password != null) {
+ String plain = String.format("%s:%s", username, password);
+ authStr = Base64.encodeBase64String(plain.getBytes());
+ } else {
+ authStr = null;
+ }
+ }
+
+ protected HttpGet getReq(URI uri, int timeoutMs) throws AuthenticationException {
+
+ HttpGet out = (uri == null) ? new HttpGet() : new HttpGet(uri);
+ if (authStr != null) {
+ out.setHeader("Authorization", String.format("Basic %s", authStr));
+ }
+
+ out.setConfig(getConfig(timeoutMs));
+ return out;
+ }
+
+ protected HttpPost postReq(String url) throws AuthenticationException {
+
+ HttpPost out = (url == null) ? new HttpPost() : new HttpPost(url);
+ if (authStr != null) {
+ out.setHeader("Authorization", String.format("Basic %s", authStr));
+ }
+ out.setConfig(getConfig(0));
+ return out;
+ }
+
+ private RequestConfig getConfig(int timeoutMs) {
+ Builder builder = RequestConfig.custom();
+ builder.setSocketTimeout(timeoutMs + TIMEOUT_OFFSET);
+ return builder.build();
+ }
+
+ protected CloseableHttpClient getClient() {
+ return HttpClientBuilder.create().build();
+ }
+
+ protected String formatHostString(String host) {
+ return formatHostString(host, host.contains(String.valueOf(HTTPS_PORT)));
+ }
+
+ private String formatHostString(String host, boolean useHttps) {
+ // Trim trailing slash
+ String out = host.endsWith("/") ? host.substring(0, host.length() - 1) : host;
+
+ boolean hasProtocol = out.startsWith("http");
+ boolean hasPort = out.contains(":");
+
+ // Add protocol
+ if (!hasProtocol) {
+ out = String.format("%s%s", (useHttps) ? "https://" : "http://", out);
+ }
+ // Add port
+ if (!hasPort) {
+ out = String.format("%s:%d", out, (useHttps) ? HTTPS_PORT : HTTP_PORT);
+ }
+ return out;
+ }
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
new file mode 100644
index 000000000..e2a20ee45
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
@@ -0,0 +1,154 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.http.HttpEntity;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+import org.json.JSONArray;
+import org.onap.appc.adapter.message.Consumer;
+
+public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer {
+
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
+
+ // Default values
+ private static final int DEFAULT_TIMEOUT_MS = 15000;
+ private static final int DEFAULT_LIMIT = 1000;
+ private static final String URL_TEMPLATE = "%s/events/%s/%s/%s";
+
+ private List<String> urls;
+ private String filter;
+
+ public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
+ String filter) {
+
+ this(hosts, topicName, consumerName, consumerId, filter, null, null);
+ }
+
+ public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
+ String filter, String user, String password) {
+
+ urls = new ArrayList<>();
+ for (String host : hosts) {
+ urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId));
+ }
+ this.filter = filter;
+ updateCredentials(user, password);
+ }
+
+ @Override
+ public void updateCredentials(String user, String pass) {
+ LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
+ this.setBasicAuth(user, pass);
+ }
+
+ @Override
+ public List<String> fetch(int waitMs, int limit) {
+ LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
+ List<String> out = new ArrayList<>();
+ try {
+ List<NameValuePair> urlParams = new ArrayList<>();
+ urlParams.add(new BasicNameValuePair("timeout", String.valueOf(waitMs)));
+ urlParams.add(new BasicNameValuePair("limit", String.valueOf(limit)));
+ if (filter != null) {
+ urlParams.add(new BasicNameValuePair("filter", filter));
+ }
+
+ URIBuilder builder = new URIBuilder(urls.get(0));
+ builder.setParameters(urlParams);
+
+ URI uri = builder.build();
+ LOG.info(String.format("GET %s", uri));
+ HttpGet request = getReq(uri, waitMs);
+ CloseableHttpResponse response = getClient().execute(request);
+
+ int httpStatus = response.getStatusLine().getStatusCode();
+ HttpEntity entity = response.getEntity();
+ String body = (entity != null) ? entityToString(entity) : null;
+
+ LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
+ body != null ? body.length() : "null"));
+
+ response.close();
+ if (httpStatus == 200 && body != null) {
+ JSONArray json = new JSONArray(body);
+ LOG.info(String.format("Got %d messages from DMaaP", json.length()));
+ for (int i = 0; i < json.length(); i++) {
+ out.add(json.getString(i));
+ }
+ } else {
+ LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
+ sleep(waitMs);
+ }
+ } catch (Exception e) {
+ if (urls.size() > 1) {
+ String failedUrl = urls.remove(0);
+ urls.add(failedUrl);
+ LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
+ urls.get(0)));
+ }
+ LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
+ sleep(waitMs);
+ }
+ return out;
+ }
+
+ @Override
+ public List<String> fetch() {
+ return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
+ }
+
+ @Override
+ public String toString() {
+ String hostStr = (urls == null || urls.isEmpty()) ? "N/A" : urls.get(0);
+ return String.format("Consumer listening to [%s]", hostStr);
+ }
+
+ String entityToString(HttpEntity entity) throws IOException {
+ return EntityUtils.toString(entity);
+ }
+
+ private void sleep(int ms) {
+ LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000));
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e1) {
+ LOG.error("Interrupted while sleeping", e1);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
new file mode 100644
index 000000000..84b5a5ff6
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
@@ -0,0 +1,140 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.onap.appc.adapter.message.Producer;
+
+public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer {
+
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class);
+
+ private static final String CONTENT_TYPE = "application/cambria";
+ private static final String URL_TEMPLATE = "%s/events/%s";
+
+ private List<String> hosts;
+ private Set<String> topics;
+
+ public HttpDmaapProducerImpl() {
+ //for test purposes
+ }
+
+ public HttpDmaapProducerImpl(Collection<String> urls, String topicName, String username, String password) {
+ this(urls, topicName);
+ updateCredentials(username, password);
+ }
+
+ public HttpDmaapProducerImpl(Collection<String> urls, String topicName) {
+ topics = new HashSet<>();
+ topics.add(topicName);
+
+ hosts = new ArrayList<>();
+ for (String host : urls) {
+ hosts.add(formatHostString(host));
+ }
+ }
+
+ @Override
+ public void updateCredentials(String user, String pass) {
+ LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
+ this.setBasicAuth(user, pass);
+ }
+
+ @Override
+ public boolean post(String partition, String data) {
+ LOG.debug("Entering HttpDmaapProducerImpl::: post ");
+ long sent = 0;
+ try {
+ HttpPost request = postReq(null);
+ request.setHeader("Content-Type", CONTENT_TYPE);
+ request.setEntity(new StringEntity(bodyLine(partition, data)));
+
+ LOG.debug("Before sendRequest HttpDmaapProducerImpl::: post ");
+ sent = topics.stream()
+ .filter(topic -> sendRequest(request, topic))
+ .count();
+
+ } catch (Exception buildEx) {
+ LOG.error(
+ String.format("Failed to build request with string [%s]. Message not sent to any topic. Reason: %s",
+ data, buildEx.getMessage()),
+ buildEx);
+ }
+ LOG.debug("Exiting HttpDmaapProducerImpl::: post ");
+ return sent == topics.size();
+ }
+
+ private boolean sendRequest(HttpPost request, String topic) {
+ boolean successful = false;
+ String uriStr = String.format(URL_TEMPLATE, hosts.get(0), topic);
+ try {
+ request.setURI(new URI(uriStr));
+ LOG.debug("HttpDmaapProducerImpl::: before sendRequest()");
+ CloseableHttpResponse response = getClient().execute(request);
+ LOG.debug("HttpDmaapProducerImpl::: after sendRequest()");
+ if (response.getStatusLine().getStatusCode() == 200) {
+ successful = true;
+ }
+ else {
+ LOG.debug("HttpDmaapProducerImpl::: did not receive 200 for sendRequest");
+ }
+ response.close();
+ } catch (Exception sendEx) {
+ LOG.error(String.format("Failed to send message to %s. Reason: %s", uriStr, sendEx.getMessage()),
+ sendEx);
+ if (hosts.size() > 1) {
+ String failedUrl = hosts.remove(0);
+ hosts.add(failedUrl);
+ LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s",
+ failedUrl, hosts.get(0)));
+ }
+ }
+ return successful;
+ }
+
+ /**
+ * Format the body for the application/cambria content type with no partitioning. See
+ *
+ * @param message
+ * The message body to format
+ * @return A string in the application/cambria content type
+ */
+ private String bodyLine(String partition, String message) {
+ String prt = (partition == null) ? "" : partition;
+ String msg = (message == null) ? "" : message;
+ return String.format("%d.%d.%s%s", prt.length(), msg.length(), prt, msg);
+ }
+
+
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/utils/DmaapUtil.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/utils/DmaapUtil.java
new file mode 100644
index 000000000..af20ac7c6
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/utils/DmaapUtil.java
@@ -0,0 +1,155 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.utils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.onap.appc.configuration.Configuration;
+import org.onap.appc.configuration.ConfigurationFactory;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+public class DmaapUtil {
+
+ private static final char DELIMITER = '_';
+
+ static final String DMAAP_PROPERTIES_PATH = "org.onap.appc.dmaap.profile.path";
+
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(DmaapUtil.class);
+
+ private DmaapUtil() {
+ }
+
+ private static String createPreferredRouteFileIfNotExist(String topic) throws IOException {
+ String topicPreferredRouteFileName;
+ topicPreferredRouteFileName = topic + "preferredRoute.properties";
+ File fo = new File(topicPreferredRouteFileName);
+ if (!fo.exists()) {
+ ClassLoader classLoader = DmaapUtil.class.getClassLoader();
+ InputStream inputStream = classLoader.getResourceAsStream("preferredRoute.txt");
+ Properties props = new Properties();
+ props.load(inputStream);
+ String fileName = topic != null ? topic + DELIMITER + "MR1" : DELIMITER + "MR1";
+ props.setProperty("preferredRouteKey", fileName);
+ topicPreferredRouteFileName = topic + "preferredRoute.properties";
+ props.store(new FileOutputStream(topicPreferredRouteFileName),
+ "preferredRoute.properties -This file is generated automatically for topic:" + topic + " on:"
+ + System.currentTimeMillis());
+ }
+ return topicPreferredRouteFileName;
+ }
+
+ public static String createConsumerPropFile(String topic, Properties props) throws IOException {
+ String defaultProfFileName = "consumer.properties";
+
+ log.debug("Creating DMaaP Consumer Property File for topic " + topic);
+ return createConsumerProducerPropFile(topic, defaultProfFileName, props);
+ }
+
+ public static String createProducerPropFile(String topic, Properties props) throws IOException {
+ String defaultProfFileName = "producer.properties";
+
+ log.debug("Creating DMaaP Producer Property File for topic " + topic);
+ return createConsumerProducerPropFile(topic, defaultProfFileName, props);
+ }
+
+ private static String createConsumerProducerPropFile(String topic, String defaultProfFileName, Properties props)
+ throws IOException {
+ Properties defaultProps = getDefaultProperties(defaultProfFileName);
+
+ defaultProps.setProperty("topic", topic);
+
+ String preferredRouteFileName = DmaapUtil.createPreferredRouteFileIfNotExist(topic);
+ if (props != null && !props.isEmpty()) {
+ defaultProps.putAll(props);
+ }
+ defaultProps.setProperty("topic", topic);
+ defaultProps.setProperty("DME2preferredRouterFilePath", preferredRouteFileName);
+ String id = defaultProps.getProperty("id");
+ String topicConsumerPropFileName = defaultProfFileName;
+ topicConsumerPropFileName = id != null ? id + DELIMITER + topicConsumerPropFileName
+ : DELIMITER + topicConsumerPropFileName;
+ topicConsumerPropFileName = topic != null ? topic + DELIMITER + topicConsumerPropFileName
+ : DELIMITER + topicConsumerPropFileName;
+
+ defaultProps.store(new FileOutputStream(topicConsumerPropFileName), defaultProfFileName
+ + " - This file is generated automatically for topic:" + topic + " on:" + System.currentTimeMillis());
+ return topicConsumerPropFileName;
+ }
+
+ private static Properties getDefaultProperties(String profileName) {
+ Properties props = new Properties();
+
+ // use appc configuration to get all properties which includes
+ // appc.properties and system properties
+ // allowing variable to be set in any location
+ Configuration config = ConfigurationFactory.getConfiguration();
+ String dmaapPropPath = config.getProperty(DMAAP_PROPERTIES_PATH);
+
+ if (dmaapPropPath != null) {
+ // load from file system
+
+ File profileFile = new File(dmaapPropPath, profileName);
+ FileInputStream inputStream = null;
+
+ log.info("Loading DMaaP Profile from " + profileFile.getAbsolutePath());
+
+ if (profileFile.exists()) {
+ try {
+ inputStream = new FileInputStream(profileFile);
+ props.load(inputStream);
+ } catch (IOException e) {
+ log.error("Exception loading DMaaP Profile from " + profileFile.getAbsolutePath(), e);
+ } finally {
+ try {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ } catch (IOException ex) {
+ log.warn("Exception closing DMaaP Profile file " + profileFile.getAbsolutePath(), ex);
+ }
+ }
+ }
+ }
+ if (props.isEmpty()) {
+ // load default Profile from class
+ log.info("Loading Default DMaaP Profile");
+
+ ClassLoader classLoader = DmaapUtil.class.getClassLoader();
+ InputStream inputStream = classLoader.getResourceAsStream(profileName);
+ try {
+ props.load(inputStream);
+ } catch (IOException e) {
+ log.error("Exception loading Default DMaaP Profile", e);
+ }
+ }
+
+ return props;
+ }
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties
new file mode 100644
index 000000000..b19a33505
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties
@@ -0,0 +1,56 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END=========================================================
+###
+
+#TransportType-Specify which way user want to use. I.e. <HTTPAAF,DME2,HTTPAUTH >
+TransportType=HTTPNOAUTH
+Latitude =50.000000
+Longitude =-100.000000
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.openecomp.org/events
+Environment =TEST
+Partner=BOT_R
+routeOffer=MR1
+SubContextPath =/
+Protocol =http
+MethodType =GET
+username =admin
+password =admin
+contenttype =application/json
+authKey=01234567890abcde:01234567890abcdefghijklmn
+authDate=2016-02-18T13:57:37-0800
+host=127.0.0.1
+topic=org.onap.appc.UNIT-TEST
+group=jmsgrp
+id=2
+timeout=15000
+limit=1000
+filter=
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=preferredRoute.txt
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/org/onap/appc/default.properties b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/org/onap/appc/default.properties
new file mode 100644
index 000000000..218cafe6e
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/org/onap/appc/default.properties
@@ -0,0 +1,27 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END=========================================================
+###
+
+# ${user.home} usually goes to /root if instantiation uses the appc-docker approach
+
+org.onap.appc.bootstrap.file=appc.properties
+org.onap.appc.bootstrap.path=/opt/onap/appc/data/properties,${user.home},.
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt
new file mode 100644
index 000000000..7e6ed8bdf
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt
@@ -0,0 +1,22 @@
+# ============LICENSE_START==========================================
+# ONAP : APPC
+# ===================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the License);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+# ============LICENSE_END============================================
+preferredRouteKey=MR1
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/producer.properties b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/producer.properties
new file mode 100644
index 000000000..129ec9c6b
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/producer.properties
@@ -0,0 +1,54 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END=========================================================
+###
+
+#TransportType-Specify which way user want to use. I.e. <HTTPAAF,DME2,HTTPAUTH >
+TransportType=HTTPNOAUTH
+Latitude =50.000000
+Longitude =-100.000000
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.openecomp.org/events
+Environment =TEST
+Partner=BOT_R
+SubContextPath =/
+Protocol =http
+MethodType =POST
+username =admin
+password =admin
+contenttype = application/json
+authKey=01234567890abcde:01234567890abcdefghijklmn
+authDate=2016-07-20T11:30:56-0700
+host=127.0.0.1
+topic=org.onap.appc.UNIT-TEST
+partition=2
+maxBatchSize=100
+maxAgeMs=250
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=preferredRoute.txt
+MessageSentThreadOccurance=50
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestAppcDmaapAdapterActivator.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestAppcDmaapAdapterActivator.java
new file mode 100644
index 000000000..e5ee0680c
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestAppcDmaapAdapterActivator.java
@@ -0,0 +1,46 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+import org.onap.appc.adapter.messaging.dmaap.AppcDmaapAdapterActivator;
+
+public class TestAppcDmaapAdapterActivator {
+
+ @Test
+ public void test_dmaap_activator() {
+ // This does nothing since the activator does nothing
+ AppcDmaapAdapterActivator appc = new AppcDmaapAdapterActivator();
+ try {
+ appc.start(null);
+ appc.stop(null);
+ } catch (Exception e) {
+ fail("Got exception when starting stopping. " + e.getMessage());
+ }
+ assertNotNull(appc.getName());
+ }
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java
new file mode 100644
index 000000000..ec9740053
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java
@@ -0,0 +1,83 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap;
+
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.appc.adapter.message.Consumer;
+import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapConsumerImpl;
+import org.onap.appc.configuration.Configuration;
+import org.onap.appc.configuration.ConfigurationFactory;
+import org.junit.Ignore;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Must have a DMaaP cluster or simulator up and running
+ * Update the hostname, topic, client properties in
+ * resources/org/onap/appc/default.properties
+ *
+ */
+public class TestDmaapConsuming {
+
+ private static Consumer httpConsumer;
+
+ @BeforeClass
+ public static void setUp() {
+
+ Configuration configuration = ConfigurationFactory.getConfiguration();
+
+ List<String> hosts = Arrays.asList(configuration.getProperty("poolMembers").split(","));
+ String topic = configuration.getProperty("topic.read");
+ String consumerName = configuration.getProperty("client.name");
+ String consumerId = configuration.getProperty("client.name.id");
+ String msgFilter = configuration.getProperty("message.filter");
+ String user = configuration.getProperty("dmaap.appc.username");
+ String password = configuration.getProperty("dmaap.appc.password");
+
+ httpConsumer = new HttpDmaapConsumerImpl(hosts, topic, consumerName, consumerId, msgFilter);
+ }
+
+ @Test
+ @Ignore
+ public void testHttpFetchMessages() {
+ testFetchMessages(httpConsumer);
+ }
+
+ @Test
+ @Ignore
+ public void testFetchMessages() {
+ testFetchMessages(httpConsumer);
+ }
+
+ private void testFetchMessages(Consumer consumer) {
+ List<String> messages = consumer.fetch(1000, 100);
+ Assert.assertNotNull(messages);
+ Assert.assertFalse(messages.isEmpty());
+ }
+
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java
new file mode 100644
index 000000000..5b2183e37
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java
@@ -0,0 +1,79 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap;
+
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.onap.appc.adapter.message.Producer;
+import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl;
+import org.onap.appc.configuration.Configuration;
+import org.onap.appc.configuration.ConfigurationFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Must have a DMaaP cluster or simulator up and running
+ * Update the hostname, topic, client properties in
+ * resources/org/onap/appc/default.properties
+ *
+ */
+public class TestDmaapProducing {
+
+ private static Producer httpProducer;
+
+ @BeforeClass
+ public static void setUp() {
+
+ Configuration configuration = ConfigurationFactory.getConfiguration();
+
+ List<String> hosts = Arrays.asList(configuration.getProperty("poolMembers").split(","));
+ String topic = configuration.getProperty("topic.write");
+ String user = configuration.getProperty("dmaap.appc.username");
+ String password = configuration.getProperty("dmaap.appc.password");
+
+ httpProducer = new HttpDmaapProducerImpl(hosts, topic);
+ httpProducer.updateCredentials(user,password);
+ }
+
+ @Test
+ @Ignore
+ public void testHttpPostMessage() {
+ testPostMessage(httpProducer);
+ }
+
+ @Test
+ @Ignore
+ public void testPostMessages() {
+ testPostMessage(httpProducer);
+ }
+
+ private void testPostMessage(Producer producer) {
+ Assert.assertTrue(producer.post("partition", "{\"message\": \"Hello, world!\"}"));
+ }
+
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestAuthenticationException.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestAuthenticationException.java
new file mode 100644
index 000000000..448af3022
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestAuthenticationException.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START==========================================
+ * org.onap.music
+ * ===================================================================
+ * Copyright (c) 2019 IBM.
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=============================================
+ * ====================================================================
+ */
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAuthenticationException {
+
+ @Test
+ public void testAuthException() {
+ AuthenticationException authException = new AuthenticationException("AuthenticationException");
+ Assert.assertEquals("AuthenticationException", authException.getMessage());
+ }
+
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestCommonHttpClient.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestCommonHttpClient.java
new file mode 100644
index 000000000..660a6e4bc
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestCommonHttpClient.java
@@ -0,0 +1,118 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import static org.junit.Assert.*;
+
+import java.net.URI;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCommonHttpClient {
+
+ private static final String HTTP = "http://";
+ private static final String HTTPS = "https://";
+ private static final String URL = "example.org/location";
+ private static final URI URI = java.net.URI.create(HTTP + URL);
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final int TIMEOUT = 15000;
+ private static final int TIMEOUT_OFFSET = 5000;
+ private static final int HTTP_PORT = 3904;
+ private static final int HTTPS_PORT = 3905;
+
+ private CommonHttpClient commonHttpClient;
+
+ @Before
+ public void setUp() {
+ commonHttpClient = new CommonHttpClient() {};
+ }
+
+ private void setBasicAuth() {
+ commonHttpClient.setBasicAuth(USERNAME, PASSWORD);
+ }
+
+ private void noBasicAuth() {
+ commonHttpClient.setBasicAuth(null, null);
+ }
+
+ @Test
+ public void shouldGetHttpRequest_whenSetBasicAuth() throws AuthenticationException {
+
+ setBasicAuth();
+
+ HttpGet httpGet = commonHttpClient.getReq(URI, TIMEOUT);
+
+ assertNotNull(httpGet);
+ assertNotNull(httpGet.getFirstHeader("Authorization"));
+ assertNotNull(httpGet.getConfig());
+ assertEquals(httpGet.getConfig().getSocketTimeout(), TIMEOUT + TIMEOUT_OFFSET);
+ }
+
+ @Test
+ public void shouldPostHttpRequest_whenSetBasicAuth() throws AuthenticationException {
+
+ setBasicAuth();
+
+ HttpPost httpPost = commonHttpClient.postReq(URL);
+
+ assertNotNull(httpPost);
+ assertNotNull(httpPost.getFirstHeader("Authorization"));
+ assertNotNull(httpPost.getConfig());
+ assertEquals(httpPost.getConfig().getSocketTimeout(), TIMEOUT_OFFSET);
+ }
+
+ @Test
+ public void shouldGetClient() {
+ assertNotNull(commonHttpClient.getClient());
+ }
+
+ @Test
+ public void shouldFormatHostString() {
+ String httpUrl = HTTP + URL + ":" + HTTP_PORT;
+ String httpsUrl = HTTPS + URL + ":" + HTTPS_PORT;
+ String outputUrl;
+
+ outputUrl = commonHttpClient.formatHostString(httpUrl);
+ assertTrue(assertMessage(httpUrl, outputUrl), httpUrl.equals(outputUrl));
+
+ outputUrl = commonHttpClient.formatHostString(httpsUrl);
+ assertTrue(assertMessage(httpsUrl, outputUrl), httpsUrl.equals(outputUrl));
+
+ outputUrl = commonHttpClient.formatHostString(httpsUrl + "/");
+ assertTrue(assertMessage(httpsUrl, outputUrl), httpsUrl.equals(outputUrl));
+
+ outputUrl = commonHttpClient.formatHostString(URL + ":" + HTTP_PORT);
+ assertTrue(assertMessage(httpUrl, outputUrl), httpUrl.equals(outputUrl));
+
+ outputUrl = commonHttpClient.formatHostString(URL + ":" + HTTPS_PORT);
+ assertTrue(assertMessage(httpsUrl, outputUrl), httpsUrl.equals(outputUrl));
+
+ outputUrl = commonHttpClient.formatHostString(URL);
+ assertTrue(assertMessage(httpUrl, outputUrl), httpUrl.equals(outputUrl));
+ }
+
+ private String assertMessage(String expected, String actual) {
+ return "Expected: " + expected + " Actual: " + actual;
+ }
+} \ No newline at end of file
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapConsumerImpl.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapConsumerImpl.java
new file mode 100644
index 000000000..ea9a5e921
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapConsumerImpl.java
@@ -0,0 +1,153 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import static javax.ws.rs.core.Response.Status.FORBIDDEN;
+import static javax.ws.rs.core.Response.Status.OK;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import org.apache.http.HttpEntity;
+import org.apache.http.StatusLine;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+
+public class TestHttpDmaapConsumerImpl {
+
+ private static final Collection<String> URLS = Arrays.asList("test.com", "test.org");
+ private static final Collection<String> OUTPUT_MSG = Arrays.asList("FirstMessage", "SecondMessage");
+ private static final String TOPIC_NAME = "Topic";
+ private static final String CONSUMER_NAME = "Consumer";
+ private static final String CONSUMER_ID = "Id";
+ private static final String FILTER = "filter";
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final String MESSAGE_BODY = "[FirstMessage, SecondMessage]";
+ private static final int TIMEOUT_MS = 1000;
+ private static final int LIMIT = 1000;
+
+ @Spy
+ private HttpDmaapConsumerImpl httpDmaapConsumer;
+
+ @Mock
+ private CloseableHttpClient httpClient;
+
+ @Mock
+ private CloseableHttpResponse httpResponse;
+
+ @Mock
+ private StatusLine statusLine;
+
+ @Mock
+ private HttpEntity entity;
+
+ @Before
+ public void setUp() throws Exception {
+ httpDmaapConsumer = new HttpDmaapConsumerImpl(URLS, TOPIC_NAME, CONSUMER_NAME, CONSUMER_ID, FILTER);
+ httpDmaapConsumer.updateCredentials(USERNAME, PASSWORD);
+
+ MockitoAnnotations.initMocks(this);
+ doReturn(httpClient).when(httpDmaapConsumer).getClient();
+ when(httpClient.execute(any(HttpGet.class))).thenReturn(httpResponse);
+ when(httpResponse.getStatusLine()).thenReturn(statusLine);
+ when(httpResponse.getEntity()).thenReturn(entity);
+ doReturn(MESSAGE_BODY).when(httpDmaapConsumer).entityToString(same(entity));
+ }
+
+ @Test
+ public void shouldGetHttpRequest() throws Exception {
+
+ when(statusLine.getStatusCode()).thenReturn(OK.getStatusCode());
+
+ List<String> output = httpDmaapConsumer.fetch();
+
+ assertFalse(output.isEmpty());
+ assertTrue(output.containsAll(OUTPUT_MSG));
+ verify(httpClient).execute(any(HttpGet.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).getEntity();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine, entity);
+ }
+
+ @Test
+ public void shouldNotBeSuccessful_whenHttpResponseIsOtherThanOk() throws Exception {
+
+ when(statusLine.getStatusCode()).thenReturn(FORBIDDEN.getStatusCode());
+
+ List<String> output = httpDmaapConsumer.fetch(TIMEOUT_MS, LIMIT);
+
+ assertTrue(output.isEmpty());
+ verify(httpClient).execute(any(HttpGet.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).getEntity();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine, entity);
+ }
+
+ @Test
+ public void shouldNotBeSuccessful_whenRequestToOneOfUrlsCannotBeSent() throws Exception {
+
+ when(httpClient.execute(any(HttpGet.class))).thenThrow(new ClientProtocolException());
+
+ List<String> output = httpDmaapConsumer.fetch(TIMEOUT_MS, LIMIT);
+
+ assertTrue(output.isEmpty());
+ verify(httpClient).execute(any(HttpGet.class));
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine, entity);
+
+
+ reset(httpClient);
+ when(httpClient.execute(any(HttpGet.class))).thenReturn(httpResponse);
+ when(statusLine.getStatusCode()).thenReturn(OK.getStatusCode());
+
+ output = httpDmaapConsumer.fetch(TIMEOUT_MS, LIMIT);
+
+ assertFalse(output.isEmpty());
+ assertTrue(output.containsAll(OUTPUT_MSG));
+ verify(httpClient).execute(any(HttpGet.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).getEntity();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine, entity);
+ }
+
+} \ No newline at end of file
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapProducerImpl.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapProducerImpl.java
new file mode 100644
index 000000000..01e41a857
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapProducerImpl.java
@@ -0,0 +1,135 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import static javax.ws.rs.core.Response.Status.FORBIDDEN;
+import static javax.ws.rs.core.Response.Status.OK;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.http.StatusLine;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+
+public class TestHttpDmaapProducerImpl {
+
+ private static final Collection<String> URLS = Arrays.asList("test.com", "test.org");
+ private static final String TOPIC_NAME = "Topic";
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final String PARTITION = "partition";
+ private static final String DATA = "data";
+
+
+ @Spy
+ private HttpDmaapProducerImpl httpDmaapProducer = new HttpDmaapProducerImpl();
+
+ @Mock
+ private CloseableHttpClient httpClient;
+
+ @Mock
+ private CloseableHttpResponse httpResponse;
+
+ @Mock
+ private StatusLine statusLine;
+
+ @Before
+ public void setUp() throws Exception {
+ httpDmaapProducer = new HttpDmaapProducerImpl(URLS, TOPIC_NAME);
+ httpDmaapProducer.updateCredentials(USERNAME, PASSWORD);
+
+ MockitoAnnotations.initMocks(this);
+ doReturn(httpClient).when(httpDmaapProducer).getClient();
+ when(httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
+ when(httpResponse.getStatusLine()).thenReturn(statusLine);
+ }
+
+ @Test
+ public void shouldPostHttpRequest() throws Exception {
+
+ when(statusLine.getStatusCode()).thenReturn(OK.getStatusCode());
+
+ boolean successful = httpDmaapProducer.post(PARTITION, DATA);
+
+ assertTrue(successful);
+ verify(httpClient).execute(any(HttpPost.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine);
+ }
+
+ @Test
+ public void shouldNotBeSuccessful_whenHttpResponseIsOtherThanOk() throws Exception {
+
+ when(statusLine.getStatusCode()).thenReturn(FORBIDDEN.getStatusCode());
+
+ boolean successful = httpDmaapProducer.post(PARTITION, DATA);
+
+ assertFalse(successful);
+ verify(httpClient).execute(any(HttpPost.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine);
+ }
+
+ @Test
+ public void shouldNotBeSuccessful_whenRequestToOneOfUrlsCannotBeSent() throws Exception {
+
+ when(httpClient.execute(any(HttpPost.class))).thenThrow(new ClientProtocolException());
+
+ boolean successful = httpDmaapProducer.post(PARTITION, DATA);
+
+ assertFalse(successful);
+ verify(httpClient).execute(any(HttpPost.class));
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine);
+
+
+ reset(httpClient);
+ when(httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
+ when(statusLine.getStatusCode()).thenReturn(OK.getStatusCode());
+
+ successful = httpDmaapProducer.post(PARTITION, DATA);
+
+ assertTrue(successful);
+ verify(httpClient).execute(any(HttpPost.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine);
+ }
+} \ No newline at end of file
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/utils/TestDmaapUtil.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/utils/TestDmaapUtil.java
new file mode 100644
index 000000000..1567242b5
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/utils/TestDmaapUtil.java
@@ -0,0 +1,268 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.util.Properties;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.appc.configuration.ConfigurationFactory;
+
+public class TestDmaapUtil {
+ private static Class<?> configurationFactoryClass;
+ private static Field configField;
+
+ @Test
+ public void testCreateConsumerPropFile() {
+ String topic = "JunitTopicOne";
+ Properties junitProps = new Properties();
+ junitProps.put("host", "192.168.10.10");
+ junitProps.put("group", "junit-client");
+ junitProps.put("id", "junit-consumer-one");
+ junitProps.put("filter", "none");
+
+ String junitFile = null;
+
+ // ensure file path property is not set
+ if (System.getProperty(DmaapUtil.DMAAP_PROPERTIES_PATH) != null) {
+ System.clearProperty(DmaapUtil.DMAAP_PROPERTIES_PATH);
+
+ // set configuration to null to force reloading of properties
+ try {
+ configField.set(null, null);
+ } catch (IllegalArgumentException | IllegalAccessException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+ }
+ try {
+ junitFile = DmaapUtil.createConsumerPropFile(topic, junitProps);
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Exception creating consumer property file");
+ }
+
+ assertNotNull(junitFile);
+
+ // open file and verify properties
+ File testFile = new File(junitFile);
+ assertTrue(testFile.exists());
+
+ InputStream is = null;
+ Properties testProps = new Properties();
+ try {
+ is = new FileInputStream(testFile);
+ testProps.load(is);
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ fail("Exception opening consumer property file");
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Exception opening consumer property file");
+ } finally {
+ try {
+ if (is != null) {
+ is.close();
+ }
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ fail("Exception closing consumer property file");
+ }
+ }
+
+ assertFalse(testProps.isEmpty());
+
+ assertEquals(testProps.get("host"), "192.168.10.10");
+ assertEquals(testProps.get("group"), "junit-client");
+ assertEquals(testProps.get("id"), "junit-consumer-one");
+ assertEquals(testProps.get("filter"), "none");
+ assertEquals(testProps.get("TransportType"), "HTTPNOAUTH");
+ }
+
+ @Test
+ public void testCreateConsumerPropFileWithCustomProfile() {
+ String topic = "JunitTopicOne";
+ Properties junitProps = new Properties();
+ junitProps.put("host", "192.168.10.10");
+ junitProps.put("group", "junit-client");
+ junitProps.put("id", "junit-consumer-two");
+ junitProps.put("filter", "none");
+
+ String junitFile = null;
+
+ // set property for DMaaP profile
+ System.setProperty(DmaapUtil.DMAAP_PROPERTIES_PATH, "src/test/resources/org/onap/appc");
+
+ // set configuration to null to force reloading of properties
+ try {
+ configField.set(null, null);
+ } catch (IllegalArgumentException | IllegalAccessException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+
+ try {
+ junitFile = DmaapUtil.createConsumerPropFile(topic, junitProps);
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Exception creating consumer property file");
+ }
+
+ assertNotNull(junitFile);
+
+ // open file and verify properties
+ File testFile = new File(junitFile);
+ assertTrue(testFile.exists());
+
+ InputStream is = null;
+ Properties testProps = new Properties();
+ try {
+ is = new FileInputStream(testFile);
+ testProps.load(is);
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ fail("Exception opening consumer property file");
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Exception opening consumer property file");
+ } finally {
+ try {
+ if (is != null) {
+ is.close();
+ }
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ fail("Exception closing consumer property file");
+ }
+ }
+
+ assertFalse(testProps.isEmpty());
+
+ assertEquals(testProps.get("host"), "192.168.10.10");
+ assertEquals(testProps.get("group"), "junit-client");
+ assertEquals(testProps.get("id"), "junit-consumer-two");
+ assertEquals(testProps.get("filter"), "none");
+ assertEquals(testProps.get("TransportType"), "HTTPAAF");
+ }
+
+ @Test
+ public void testCreateProducerPropFile() {
+ String topic = "JunitTopicOne";
+ Properties junitProps = new Properties();
+ junitProps.put("host", "192.168.10.10");
+ junitProps.put("group", "junit-client");
+ junitProps.put("id", "junit-producer-one");
+ junitProps.put("filter", "none");
+
+ String junitFile = null;
+
+ // ensure file path property is not set
+ if (System.getProperty(DmaapUtil.DMAAP_PROPERTIES_PATH) != null) {
+ System.clearProperty(DmaapUtil.DMAAP_PROPERTIES_PATH);
+
+ // set configuration to null to force reloading of properties
+ try {
+ configField.set(null, null);
+ } catch (IllegalArgumentException | IllegalAccessException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+ }
+
+ try {
+ junitFile = DmaapUtil.createProducerPropFile(topic, junitProps);
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Exception creating consumer property file");
+ }
+
+ assertNotNull(junitFile);
+
+ // open file and verify properties
+ File testFile = new File(junitFile);
+ assertTrue(testFile.exists());
+
+ InputStream is = null;
+ Properties testProps = new Properties();
+ try {
+ is = new FileInputStream(testFile);
+ testProps.load(is);
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ fail("Exception opening consumer property file");
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Exception opening consumer property file");
+ } finally {
+ try {
+ if (is != null) {
+ is.close();
+ }
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ fail("Exception closing consumer property file");
+ }
+ }
+
+ assertFalse(testProps.isEmpty());
+
+ assertEquals(testProps.get("host"), "192.168.10.10");
+ assertEquals(testProps.get("group"), "junit-client");
+ assertEquals(testProps.get("id"), "junit-producer-one");
+ assertEquals(testProps.get("filter"), "none");
+ assertEquals("HTTPNOAUTH", testProps.get("TransportType"));
+ }
+
+ /**
+ * Use reflection to locate fields and methods so that they can be
+ * manipulated during the test to change the internal state accordingly.
+ *
+ * @throws NoSuchFieldException
+ * if the field(s) dont exist
+ * @throws SecurityException
+ * if reflective access is not allowed
+ * @throws NoSuchMethodException
+ * If the method(s) dont exist
+ */
+ @SuppressWarnings("nls")
+ @BeforeClass
+ public static void once() throws NoSuchFieldException, SecurityException, NoSuchMethodException {
+ configurationFactoryClass = ConfigurationFactory.class;
+
+ configField = configurationFactoryClass.getDeclaredField("config");
+ configField.setAccessible(true);
+ }
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/DCAEResponse.txt b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/DCAEResponse.txt
new file mode 100644
index 000000000..884a8525f
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/DCAEResponse.txt
@@ -0,0 +1,20 @@
+{
+"requestTime": "0000-00-00 14:31:19.653787",
+"policyVersion": "13",
+"VMName": "123",
+"from": "test",
+"msgOid": ".1.3.6.1.4.1.193.183.4.1.3.5.1.4",
+"trapID": "1234567",
+"requestClient": "test",
+"message": "Abnormal condition detected",
+"time": "123567890",
+"policyName": "RESTART",
+"trapIDOID": ".1.3.6.1.4.1.193.183.4.1.3.5.1.3",
+"request": "Restart",
+"OPS_CL_timer": "15",
+"nOID": ".1.3.6.1.4.1.193.183.4.2.0.4",
+"AgentAddress": "192.168.1.2",
+"vmOID": ".1.3.6.1.4.1.193.183.4.1.2.1",
+"AICTenantID": "0123456789abcdef0123456789",
+"AICVServerSelfLink": "http://somewhere"
+} \ No newline at end of file
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/consumer.properties b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/consumer.properties
new file mode 100644
index 000000000..709619452
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/consumer.properties
@@ -0,0 +1,54 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END=========================================================
+###
+
+#TransportType-Specify which way user want to use. I.e. <HTTPAAF,DME2,HTTPAUTH >
+TransportType=HTTPAAF
+Latitude =50.000000
+Longitude =-100.000000
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.openecomp.org/events
+Environment =TEST
+Partner=BOT_R
+routeOffer=MR1
+SubContextPath =/
+Protocol =http
+MethodType =GET
+contenttype =application/json
+#authKey=01234567890abcde:01234567890abcdefghijklmn
+#authDate=2016-02-18T13:57:37-0800
+host=127.0.0.1
+topic=org.onap.appc.UNIT-TEST
+group=jmsgrp
+id=2
+timeout=15000
+limit=1000
+filter=
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=preferredRoute.txt
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/default.properties b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/default.properties
new file mode 100644
index 000000000..8dea0a61d
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/default.properties
@@ -0,0 +1,38 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END=========================================================
+###
+
+org.onap.appc.bootstrap.file=test.properties
+org.onap.appc.bootstrap.path=/opt/onap/appc/data/properties,${user.home},.
+
+# Properties commented out below are provided in appc.properties
+poolMembers=10.0.0.1
+#event.pool.members=<DMAAP_IP>:3904
+
+topic.read=APPC-CL
+topic.read.timeout=5
+topic.write=APPC-CL
+event.topic.write=APPC-CL
+client.name=APPC-CLIENT-DMAAP-ADAPTER-TEST
+client.name.id=0
+
+metric.enabled=false;
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/producer.properties b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/producer.properties
new file mode 100644
index 000000000..9cc7f2e55
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/producer.properties
@@ -0,0 +1,52 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END=========================================================
+###
+
+#TransportType-Specify which way user want to use. I.e. <HTTPAAF,DME2,HTTPAUTH >
+TransportType=HTTPAAF
+Latitude =50.000000
+Longitude =-100.000000
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.openecomp.org/events
+Environment =TEST
+Partner=BOT_R
+SubContextPath =/
+Protocol =http
+MethodType =POST
+contenttype = application/json
+authKey=01234567890abcde:01234567890abcdefghijklmn
+authDate=2016-07-20T11:30:56-0700
+host=127.0.0.1
+topic=org.onap.appc.UNIT-TEST
+partition=2
+maxBatchSize=100
+maxAgeMs=250
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=preferredRoute.txt
+MessageSentThreadOccurance=50
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/test.properties b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/test.properties
new file mode 100644
index 000000000..a451739bf
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/test.properties
@@ -0,0 +1,37 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END=========================================================
+###
+
+# Properties commented out below are provided in appc.properties
+#poolMembers=<DMAAP_IP>:3904
+#event.pool.members=<DMAAP_IP>:3904
+
+topic.read=APPC-TEST2
+topic.read.timeout=5
+topic.write=APPC-TEST2
+event.topic.write=APPC-TEST2
+client.name=APPC-CLIENT-DMAAP-ADAPTER-TEST
+client.name.id=0
+#client.key=fakeKey
+#client.secret=fakeSecret
+#event.client.key=fakeKey
+#event.client.secret=fakeSecret