aboutsummaryrefslogtreecommitdiffstats
path: root/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src
diff options
context:
space:
mode:
authorPatrick Brady <patrick.brady@att.com>2019-10-10 15:34:00 -0700
committerTakamune Cho <takamune.cho@att.com>2019-11-08 16:38:47 +0000
commit6a6d3afd489ea3b8945f1cdd8a8a74afdcc5221a (patch)
tree1b9b67b9683145d693d831eea7128d36738aa9a4 /services/appc-dmaap-service/appc-dmaap-adapter-bundle/src
parent7b04a1754482ba02a49d4d6376dc45e4cd6551f4 (diff)
Dmaap micro service jar
Creating a service running in a standalone jar to handle the publishing and recieving of dmaap messages for appc. Dmaap adapter and event listener code is copeid from the main appc project. It will be moved in a later commit. Change-Id: I3fa7b5dc60345f0f38f763a243150b8472f985ac Signed-off-by: Patrick Brady <patrick.brady@att.com> Issue-ID: APPC-1744
Diffstat (limited to 'services/appc-dmaap-service/appc-dmaap-adapter-bundle/src')
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java104
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java28
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java105
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java154
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java140
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/utils/DmaapUtil.java155
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties56
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/org/onap/appc/default.properties27
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt22
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/producer.properties54
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestAppcDmaapAdapterActivator.java46
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java83
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java79
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestAuthenticationException.java35
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestCommonHttpClient.java118
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapConsumerImpl.java153
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapProducerImpl.java135
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/utils/TestDmaapUtil.java268
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/DCAEResponse.txt20
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/consumer.properties54
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/default.properties38
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/producer.properties52
-rw-r--r--services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/test.properties37
23 files changed, 1963 insertions, 0 deletions
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java
new file mode 100644
index 000000000..828cdb773
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java
@@ -0,0 +1,104 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+
+
+
+/**
+ * This activator is used to initialize and terminate the connection pool to one or more providers.
+ * <p>
+ * The CDP abstraction layer supports multiple types of providers, with each provider supporting multiple tenants. The
+ * "connection" to a specific tenant on a specific provider is represented by a "context" object. These context objects
+ * are authenticated to a specific tenant on the provider, but can be reused from one request to another. Contexts are
+ * slow to set up and are resource intensive, so they are cached. However, the contexts for a specific tenant on a
+ * specific provider must be cached separately.
+ * </p>
+ * <p>
+ * Activation of the bundle creates an empty cache which is organized first by provider type, then by tenant name, with
+ * the contents being an empty pool of contexts for that provider/tenant combination. The pool is created on first use,
+ * and retained for as long as the bundle is active.
+ * </p>
+ * <p>
+ * When the bundle is deactivated, the cache is torn down with all contexts being closed.
+ * </p>
+ */
+public class AppcDmaapAdapterActivator implements BundleActivator {
+
+ /**
+ * The logger to be used
+ */
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(AppcDmaapAdapterActivator.class);
+
+ /**
+ * Called when this bundle is started so the Framework can perform the bundle-specific activities necessary to start
+ * this bundle. This method can be used to register services or to allocate any resources that this bundle needs.
+ * <p>
+ * This method must complete and return to its caller in a timely manner.
+ * </p>
+ *
+ * @param bundleContext
+ * The execution context of the bundle being started.
+ * @throws java.lang.Exception
+ * If this method throws an exception, this bundle is marked as stopped and the Framework will remove
+ * this bundle's listeners, unregister all services registered by this bundle, and release all services
+ * used by this bundle.
+ * @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext)
+ */
+ @Override
+ public void start(final BundleContext bundleContext) throws Exception {
+ LOG.info("Starting Bundle " + getName());
+ }
+
+ /**
+ * Called when this bundle is stopped so the Framework can perform the bundle-specific activities necessary to stop
+ * the bundle. In general, this method should undo the work that the BundleActivator.start method started. There
+ * should be no active threads that were started by this bundle when this bundle returns. A stopped bundle must not
+ * call any Framework objects.
+ * <p>
+ * This method must complete and return to its caller in a timely manner.
+ * </p>
+ *
+ * @param ctx
+ * The execution context of the bundle being stopped.
+ * @throws java.lang.Exception
+ * If this method throws an exception, the bundle is still marked as stopped, and the Framework will
+ * remove the bundle's listeners, unregister all services registered by the bundle, and release all
+ * services used by the bundle. *
+ * @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext)
+ */
+ @Override
+ public void stop(BundleContext ctx) throws Exception {
+ LOG.info("Stopped Bundle " + getName());
+ }
+
+ public String getName() {
+ return "DMaaP Adapter";
+ }
+
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java
new file mode 100644
index 000000000..f670d6cdd
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/AuthenticationException.java
@@ -0,0 +1,28 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+class AuthenticationException extends Exception {
+
+ public AuthenticationException(String message) {
+ super(message);
+ }
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
new file mode 100644
index 000000000..7c243d08e
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
@@ -0,0 +1,105 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import java.net.URI;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.config.RequestConfig.Builder;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+
+abstract class CommonHttpClient {
+
+ private static final int HTTP_PORT = 3904;
+ private static final int HTTPS_PORT = 3905;
+ private static final int TIMEOUT_OFFSET = 5000;
+
+ private String authStr;
+
+ protected void setBasicAuth(String username, String password) {
+ if (username != null && password != null) {
+ String plain = String.format("%s:%s", username, password);
+ authStr = Base64.encodeBase64String(plain.getBytes());
+ } else {
+ authStr = null;
+ }
+ }
+
+ protected HttpGet getReq(URI uri, int timeoutMs) throws AuthenticationException {
+
+ HttpGet out = (uri == null) ? new HttpGet() : new HttpGet(uri);
+ if (authStr != null) {
+ out.setHeader("Authorization", String.format("Basic %s", authStr));
+ }
+
+ out.setConfig(getConfig(timeoutMs));
+ return out;
+ }
+
+ protected HttpPost postReq(String url) throws AuthenticationException {
+
+ HttpPost out = (url == null) ? new HttpPost() : new HttpPost(url);
+ if (authStr != null) {
+ out.setHeader("Authorization", String.format("Basic %s", authStr));
+ }
+ out.setConfig(getConfig(0));
+ return out;
+ }
+
+ private RequestConfig getConfig(int timeoutMs) {
+ Builder builder = RequestConfig.custom();
+ builder.setSocketTimeout(timeoutMs + TIMEOUT_OFFSET);
+ return builder.build();
+ }
+
+ protected CloseableHttpClient getClient() {
+ return HttpClientBuilder.create().build();
+ }
+
+ protected String formatHostString(String host) {
+ return formatHostString(host, host.contains(String.valueOf(HTTPS_PORT)));
+ }
+
+ private String formatHostString(String host, boolean useHttps) {
+ // Trim trailing slash
+ String out = host.endsWith("/") ? host.substring(0, host.length() - 1) : host;
+
+ boolean hasProtocol = out.startsWith("http");
+ boolean hasPort = out.contains(":");
+
+ // Add protocol
+ if (!hasProtocol) {
+ out = String.format("%s%s", (useHttps) ? "https://" : "http://", out);
+ }
+ // Add port
+ if (!hasPort) {
+ out = String.format("%s:%d", out, (useHttps) ? HTTPS_PORT : HTTP_PORT);
+ }
+ return out;
+ }
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
new file mode 100644
index 000000000..e2a20ee45
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
@@ -0,0 +1,154 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.http.HttpEntity;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+import org.json.JSONArray;
+import org.onap.appc.adapter.message.Consumer;
+
+public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer {
+
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
+
+ // Default values
+ private static final int DEFAULT_TIMEOUT_MS = 15000;
+ private static final int DEFAULT_LIMIT = 1000;
+ private static final String URL_TEMPLATE = "%s/events/%s/%s/%s";
+
+ private List<String> urls;
+ private String filter;
+
+ public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
+ String filter) {
+
+ this(hosts, topicName, consumerName, consumerId, filter, null, null);
+ }
+
+ public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
+ String filter, String user, String password) {
+
+ urls = new ArrayList<>();
+ for (String host : hosts) {
+ urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId));
+ }
+ this.filter = filter;
+ updateCredentials(user, password);
+ }
+
+ @Override
+ public void updateCredentials(String user, String pass) {
+ LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
+ this.setBasicAuth(user, pass);
+ }
+
+ @Override
+ public List<String> fetch(int waitMs, int limit) {
+ LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
+ List<String> out = new ArrayList<>();
+ try {
+ List<NameValuePair> urlParams = new ArrayList<>();
+ urlParams.add(new BasicNameValuePair("timeout", String.valueOf(waitMs)));
+ urlParams.add(new BasicNameValuePair("limit", String.valueOf(limit)));
+ if (filter != null) {
+ urlParams.add(new BasicNameValuePair("filter", filter));
+ }
+
+ URIBuilder builder = new URIBuilder(urls.get(0));
+ builder.setParameters(urlParams);
+
+ URI uri = builder.build();
+ LOG.info(String.format("GET %s", uri));
+ HttpGet request = getReq(uri, waitMs);
+ CloseableHttpResponse response = getClient().execute(request);
+
+ int httpStatus = response.getStatusLine().getStatusCode();
+ HttpEntity entity = response.getEntity();
+ String body = (entity != null) ? entityToString(entity) : null;
+
+ LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
+ body != null ? body.length() : "null"));
+
+ response.close();
+ if (httpStatus == 200 && body != null) {
+ JSONArray json = new JSONArray(body);
+ LOG.info(String.format("Got %d messages from DMaaP", json.length()));
+ for (int i = 0; i < json.length(); i++) {
+ out.add(json.getString(i));
+ }
+ } else {
+ LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
+ sleep(waitMs);
+ }
+ } catch (Exception e) {
+ if (urls.size() > 1) {
+ String failedUrl = urls.remove(0);
+ urls.add(failedUrl);
+ LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
+ urls.get(0)));
+ }
+ LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
+ sleep(waitMs);
+ }
+ return out;
+ }
+
+ @Override
+ public List<String> fetch() {
+ return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
+ }
+
+ @Override
+ public String toString() {
+ String hostStr = (urls == null || urls.isEmpty()) ? "N/A" : urls.get(0);
+ return String.format("Consumer listening to [%s]", hostStr);
+ }
+
+ String entityToString(HttpEntity entity) throws IOException {
+ return EntityUtils.toString(entity);
+ }
+
+ private void sleep(int ms) {
+ LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000));
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e1) {
+ LOG.error("Interrupted while sleeping", e1);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
new file mode 100644
index 000000000..84b5a5ff6
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
@@ -0,0 +1,140 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.onap.appc.adapter.message.Producer;
+
+public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer {
+
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class);
+
+ private static final String CONTENT_TYPE = "application/cambria";
+ private static final String URL_TEMPLATE = "%s/events/%s";
+
+ private List<String> hosts;
+ private Set<String> topics;
+
+ public HttpDmaapProducerImpl() {
+ //for test purposes
+ }
+
+ public HttpDmaapProducerImpl(Collection<String> urls, String topicName, String username, String password) {
+ this(urls, topicName);
+ updateCredentials(username, password);
+ }
+
+ public HttpDmaapProducerImpl(Collection<String> urls, String topicName) {
+ topics = new HashSet<>();
+ topics.add(topicName);
+
+ hosts = new ArrayList<>();
+ for (String host : urls) {
+ hosts.add(formatHostString(host));
+ }
+ }
+
+ @Override
+ public void updateCredentials(String user, String pass) {
+ LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
+ this.setBasicAuth(user, pass);
+ }
+
+ @Override
+ public boolean post(String partition, String data) {
+ LOG.debug("Entering HttpDmaapProducerImpl::: post ");
+ long sent = 0;
+ try {
+ HttpPost request = postReq(null);
+ request.setHeader("Content-Type", CONTENT_TYPE);
+ request.setEntity(new StringEntity(bodyLine(partition, data)));
+
+ LOG.debug("Before sendRequest HttpDmaapProducerImpl::: post ");
+ sent = topics.stream()
+ .filter(topic -> sendRequest(request, topic))
+ .count();
+
+ } catch (Exception buildEx) {
+ LOG.error(
+ String.format("Failed to build request with string [%s]. Message not sent to any topic. Reason: %s",
+ data, buildEx.getMessage()),
+ buildEx);
+ }
+ LOG.debug("Exiting HttpDmaapProducerImpl::: post ");
+ return sent == topics.size();
+ }
+
+ private boolean sendRequest(HttpPost request, String topic) {
+ boolean successful = false;
+ String uriStr = String.format(URL_TEMPLATE, hosts.get(0), topic);
+ try {
+ request.setURI(new URI(uriStr));
+ LOG.debug("HttpDmaapProducerImpl::: before sendRequest()");
+ CloseableHttpResponse response = getClient().execute(request);
+ LOG.debug("HttpDmaapProducerImpl::: after sendRequest()");
+ if (response.getStatusLine().getStatusCode() == 200) {
+ successful = true;
+ }
+ else {
+ LOG.debug("HttpDmaapProducerImpl::: did not receive 200 for sendRequest");
+ }
+ response.close();
+ } catch (Exception sendEx) {
+ LOG.error(String.format("Failed to send message to %s. Reason: %s", uriStr, sendEx.getMessage()),
+ sendEx);
+ if (hosts.size() > 1) {
+ String failedUrl = hosts.remove(0);
+ hosts.add(failedUrl);
+ LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s",
+ failedUrl, hosts.get(0)));
+ }
+ }
+ return successful;
+ }
+
+ /**
+ * Format the body for the application/cambria content type with no partitioning. See
+ *
+ * @param message
+ * The message body to format
+ * @return A string in the application/cambria content type
+ */
+ private String bodyLine(String partition, String message) {
+ String prt = (partition == null) ? "" : partition;
+ String msg = (message == null) ? "" : message;
+ return String.format("%d.%d.%s%s", prt.length(), msg.length(), prt, msg);
+ }
+
+
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/utils/DmaapUtil.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/utils/DmaapUtil.java
new file mode 100644
index 000000000..af20ac7c6
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/utils/DmaapUtil.java
@@ -0,0 +1,155 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.utils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.onap.appc.configuration.Configuration;
+import org.onap.appc.configuration.ConfigurationFactory;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+public class DmaapUtil {
+
+ private static final char DELIMITER = '_';
+
+ static final String DMAAP_PROPERTIES_PATH = "org.onap.appc.dmaap.profile.path";
+
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(DmaapUtil.class);
+
+ private DmaapUtil() {
+ }
+
+ private static String createPreferredRouteFileIfNotExist(String topic) throws IOException {
+ String topicPreferredRouteFileName;
+ topicPreferredRouteFileName = topic + "preferredRoute.properties";
+ File fo = new File(topicPreferredRouteFileName);
+ if (!fo.exists()) {
+ ClassLoader classLoader = DmaapUtil.class.getClassLoader();
+ InputStream inputStream = classLoader.getResourceAsStream("preferredRoute.txt");
+ Properties props = new Properties();
+ props.load(inputStream);
+ String fileName = topic != null ? topic + DELIMITER + "MR1" : DELIMITER + "MR1";
+ props.setProperty("preferredRouteKey", fileName);
+ topicPreferredRouteFileName = topic + "preferredRoute.properties";
+ props.store(new FileOutputStream(topicPreferredRouteFileName),
+ "preferredRoute.properties -This file is generated automatically for topic:" + topic + " on:"
+ + System.currentTimeMillis());
+ }
+ return topicPreferredRouteFileName;
+ }
+
+ public static String createConsumerPropFile(String topic, Properties props) throws IOException {
+ String defaultProfFileName = "consumer.properties";
+
+ log.debug("Creating DMaaP Consumer Property File for topic " + topic);
+ return createConsumerProducerPropFile(topic, defaultProfFileName, props);
+ }
+
+ public static String createProducerPropFile(String topic, Properties props) throws IOException {
+ String defaultProfFileName = "producer.properties";
+
+ log.debug("Creating DMaaP Producer Property File for topic " + topic);
+ return createConsumerProducerPropFile(topic, defaultProfFileName, props);
+ }
+
+ private static String createConsumerProducerPropFile(String topic, String defaultProfFileName, Properties props)
+ throws IOException {
+ Properties defaultProps = getDefaultProperties(defaultProfFileName);
+
+ defaultProps.setProperty("topic", topic);
+
+ String preferredRouteFileName = DmaapUtil.createPreferredRouteFileIfNotExist(topic);
+ if (props != null && !props.isEmpty()) {
+ defaultProps.putAll(props);
+ }
+ defaultProps.setProperty("topic", topic);
+ defaultProps.setProperty("DME2preferredRouterFilePath", preferredRouteFileName);
+ String id = defaultProps.getProperty("id");
+ String topicConsumerPropFileName = defaultProfFileName;
+ topicConsumerPropFileName = id != null ? id + DELIMITER + topicConsumerPropFileName
+ : DELIMITER + topicConsumerPropFileName;
+ topicConsumerPropFileName = topic != null ? topic + DELIMITER + topicConsumerPropFileName
+ : DELIMITER + topicConsumerPropFileName;
+
+ defaultProps.store(new FileOutputStream(topicConsumerPropFileName), defaultProfFileName
+ + " - This file is generated automatically for topic:" + topic + " on:" + System.currentTimeMillis());
+ return topicConsumerPropFileName;
+ }
+
+ private static Properties getDefaultProperties(String profileName) {
+ Properties props = new Properties();
+
+ // use appc configuration to get all properties which includes
+ // appc.properties and system properties
+ // allowing variable to be set in any location
+ Configuration config = ConfigurationFactory.getConfiguration();
+ String dmaapPropPath = config.getProperty(DMAAP_PROPERTIES_PATH);
+
+ if (dmaapPropPath != null) {
+ // load from file system
+
+ File profileFile = new File(dmaapPropPath, profileName);
+ FileInputStream inputStream = null;
+
+ log.info("Loading DMaaP Profile from " + profileFile.getAbsolutePath());
+
+ if (profileFile.exists()) {
+ try {
+ inputStream = new FileInputStream(profileFile);
+ props.load(inputStream);
+ } catch (IOException e) {
+ log.error("Exception loading DMaaP Profile from " + profileFile.getAbsolutePath(), e);
+ } finally {
+ try {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ } catch (IOException ex) {
+ log.warn("Exception closing DMaaP Profile file " + profileFile.getAbsolutePath(), ex);
+ }
+ }
+ }
+ }
+ if (props.isEmpty()) {
+ // load default Profile from class
+ log.info("Loading Default DMaaP Profile");
+
+ ClassLoader classLoader = DmaapUtil.class.getClassLoader();
+ InputStream inputStream = classLoader.getResourceAsStream(profileName);
+ try {
+ props.load(inputStream);
+ } catch (IOException e) {
+ log.error("Exception loading Default DMaaP Profile", e);
+ }
+ }
+
+ return props;
+ }
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties
new file mode 100644
index 000000000..b19a33505
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/consumer.properties
@@ -0,0 +1,56 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END=========================================================
+###
+
+#TransportType-Specify which way user want to use. I.e. <HTTPAAF,DME2,HTTPAUTH >
+TransportType=HTTPNOAUTH
+Latitude =50.000000
+Longitude =-100.000000
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.openecomp.org/events
+Environment =TEST
+Partner=BOT_R
+routeOffer=MR1
+SubContextPath =/
+Protocol =http
+MethodType =GET
+username =admin
+password =admin
+contenttype =application/json
+authKey=01234567890abcde:01234567890abcdefghijklmn
+authDate=2016-02-18T13:57:37-0800
+host=127.0.0.1
+topic=org.onap.appc.UNIT-TEST
+group=jmsgrp
+id=2
+timeout=15000
+limit=1000
+filter=
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=preferredRoute.txt
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/org/onap/appc/default.properties b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/org/onap/appc/default.properties
new file mode 100644
index 000000000..218cafe6e
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/org/onap/appc/default.properties
@@ -0,0 +1,27 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END=========================================================
+###
+
+# ${user.home} usually goes to /root if instantiation uses the appc-docker approach
+
+org.onap.appc.bootstrap.file=appc.properties
+org.onap.appc.bootstrap.path=/opt/onap/appc/data/properties,${user.home},.
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt
new file mode 100644
index 000000000..7e6ed8bdf
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/preferredRoute.txt
@@ -0,0 +1,22 @@
+# ============LICENSE_START==========================================
+# ONAP : APPC
+# ===================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the License);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+# ============LICENSE_END============================================
+preferredRouteKey=MR1
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/producer.properties b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/producer.properties
new file mode 100644
index 000000000..129ec9c6b
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/main/resources/producer.properties
@@ -0,0 +1,54 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END=========================================================
+###
+
+#TransportType-Specify which way user want to use. I.e. <HTTPAAF,DME2,HTTPAUTH >
+TransportType=HTTPNOAUTH
+Latitude =50.000000
+Longitude =-100.000000
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.openecomp.org/events
+Environment =TEST
+Partner=BOT_R
+SubContextPath =/
+Protocol =http
+MethodType =POST
+username =admin
+password =admin
+contenttype = application/json
+authKey=01234567890abcde:01234567890abcdefghijklmn
+authDate=2016-07-20T11:30:56-0700
+host=127.0.0.1
+topic=org.onap.appc.UNIT-TEST
+partition=2
+maxBatchSize=100
+maxAgeMs=250
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=preferredRoute.txt
+MessageSentThreadOccurance=50
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestAppcDmaapAdapterActivator.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestAppcDmaapAdapterActivator.java
new file mode 100644
index 000000000..e5ee0680c
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestAppcDmaapAdapterActivator.java
@@ -0,0 +1,46 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+import org.onap.appc.adapter.messaging.dmaap.AppcDmaapAdapterActivator;
+
+public class TestAppcDmaapAdapterActivator {
+
+ @Test
+ public void test_dmaap_activator() {
+ // This does nothing since the activator does nothing
+ AppcDmaapAdapterActivator appc = new AppcDmaapAdapterActivator();
+ try {
+ appc.start(null);
+ appc.stop(null);
+ } catch (Exception e) {
+ fail("Got exception when starting stopping. " + e.getMessage());
+ }
+ assertNotNull(appc.getName());
+ }
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java
new file mode 100644
index 000000000..ec9740053
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapConsuming.java
@@ -0,0 +1,83 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap;
+
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.appc.adapter.message.Consumer;
+import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapConsumerImpl;
+import org.onap.appc.configuration.Configuration;
+import org.onap.appc.configuration.ConfigurationFactory;
+import org.junit.Ignore;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Must have a DMaaP cluster or simulator up and running
+ * Update the hostname, topic, client properties in
+ * resources/org/onap/appc/default.properties
+ *
+ */
+public class TestDmaapConsuming {
+
+ private static Consumer httpConsumer;
+
+ @BeforeClass
+ public static void setUp() {
+
+ Configuration configuration = ConfigurationFactory.getConfiguration();
+
+ List<String> hosts = Arrays.asList(configuration.getProperty("poolMembers").split(","));
+ String topic = configuration.getProperty("topic.read");
+ String consumerName = configuration.getProperty("client.name");
+ String consumerId = configuration.getProperty("client.name.id");
+ String msgFilter = configuration.getProperty("message.filter");
+ String user = configuration.getProperty("dmaap.appc.username");
+ String password = configuration.getProperty("dmaap.appc.password");
+
+ httpConsumer = new HttpDmaapConsumerImpl(hosts, topic, consumerName, consumerId, msgFilter);
+ }
+
+ @Test
+ @Ignore
+ public void testHttpFetchMessages() {
+ testFetchMessages(httpConsumer);
+ }
+
+ @Test
+ @Ignore
+ public void testFetchMessages() {
+ testFetchMessages(httpConsumer);
+ }
+
+ private void testFetchMessages(Consumer consumer) {
+ List<String> messages = consumer.fetch(1000, 100);
+ Assert.assertNotNull(messages);
+ Assert.assertFalse(messages.isEmpty());
+ }
+
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java
new file mode 100644
index 000000000..5b2183e37
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/TestDmaapProducing.java
@@ -0,0 +1,79 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap;
+
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.onap.appc.adapter.message.Producer;
+import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl;
+import org.onap.appc.configuration.Configuration;
+import org.onap.appc.configuration.ConfigurationFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Must have a DMaaP cluster or simulator up and running
+ * Update the hostname, topic, client properties in
+ * resources/org/onap/appc/default.properties
+ *
+ */
+public class TestDmaapProducing {
+
+ private static Producer httpProducer;
+
+ @BeforeClass
+ public static void setUp() {
+
+ Configuration configuration = ConfigurationFactory.getConfiguration();
+
+ List<String> hosts = Arrays.asList(configuration.getProperty("poolMembers").split(","));
+ String topic = configuration.getProperty("topic.write");
+ String user = configuration.getProperty("dmaap.appc.username");
+ String password = configuration.getProperty("dmaap.appc.password");
+
+ httpProducer = new HttpDmaapProducerImpl(hosts, topic);
+ httpProducer.updateCredentials(user,password);
+ }
+
+ @Test
+ @Ignore
+ public void testHttpPostMessage() {
+ testPostMessage(httpProducer);
+ }
+
+ @Test
+ @Ignore
+ public void testPostMessages() {
+ testPostMessage(httpProducer);
+ }
+
+ private void testPostMessage(Producer producer) {
+ Assert.assertTrue(producer.post("partition", "{\"message\": \"Hello, world!\"}"));
+ }
+
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestAuthenticationException.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestAuthenticationException.java
new file mode 100644
index 000000000..448af3022
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestAuthenticationException.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START==========================================
+ * org.onap.music
+ * ===================================================================
+ * Copyright (c) 2019 IBM.
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=============================================
+ * ====================================================================
+ */
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAuthenticationException {
+
+ @Test
+ public void testAuthException() {
+ AuthenticationException authException = new AuthenticationException("AuthenticationException");
+ Assert.assertEquals("AuthenticationException", authException.getMessage());
+ }
+
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestCommonHttpClient.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestCommonHttpClient.java
new file mode 100644
index 000000000..660a6e4bc
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestCommonHttpClient.java
@@ -0,0 +1,118 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import static org.junit.Assert.*;
+
+import java.net.URI;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCommonHttpClient {
+
+ private static final String HTTP = "http://";
+ private static final String HTTPS = "https://";
+ private static final String URL = "example.org/location";
+ private static final URI URI = java.net.URI.create(HTTP + URL);
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final int TIMEOUT = 15000;
+ private static final int TIMEOUT_OFFSET = 5000;
+ private static final int HTTP_PORT = 3904;
+ private static final int HTTPS_PORT = 3905;
+
+ private CommonHttpClient commonHttpClient;
+
+ @Before
+ public void setUp() {
+ commonHttpClient = new CommonHttpClient() {};
+ }
+
+ private void setBasicAuth() {
+ commonHttpClient.setBasicAuth(USERNAME, PASSWORD);
+ }
+
+ private void noBasicAuth() {
+ commonHttpClient.setBasicAuth(null, null);
+ }
+
+ @Test
+ public void shouldGetHttpRequest_whenSetBasicAuth() throws AuthenticationException {
+
+ setBasicAuth();
+
+ HttpGet httpGet = commonHttpClient.getReq(URI, TIMEOUT);
+
+ assertNotNull(httpGet);
+ assertNotNull(httpGet.getFirstHeader("Authorization"));
+ assertNotNull(httpGet.getConfig());
+ assertEquals(httpGet.getConfig().getSocketTimeout(), TIMEOUT + TIMEOUT_OFFSET);
+ }
+
+ @Test
+ public void shouldPostHttpRequest_whenSetBasicAuth() throws AuthenticationException {
+
+ setBasicAuth();
+
+ HttpPost httpPost = commonHttpClient.postReq(URL);
+
+ assertNotNull(httpPost);
+ assertNotNull(httpPost.getFirstHeader("Authorization"));
+ assertNotNull(httpPost.getConfig());
+ assertEquals(httpPost.getConfig().getSocketTimeout(), TIMEOUT_OFFSET);
+ }
+
+ @Test
+ public void shouldGetClient() {
+ assertNotNull(commonHttpClient.getClient());
+ }
+
+ @Test
+ public void shouldFormatHostString() {
+ String httpUrl = HTTP + URL + ":" + HTTP_PORT;
+ String httpsUrl = HTTPS + URL + ":" + HTTPS_PORT;
+ String outputUrl;
+
+ outputUrl = commonHttpClient.formatHostString(httpUrl);
+ assertTrue(assertMessage(httpUrl, outputUrl), httpUrl.equals(outputUrl));
+
+ outputUrl = commonHttpClient.formatHostString(httpsUrl);
+ assertTrue(assertMessage(httpsUrl, outputUrl), httpsUrl.equals(outputUrl));
+
+ outputUrl = commonHttpClient.formatHostString(httpsUrl + "/");
+ assertTrue(assertMessage(httpsUrl, outputUrl), httpsUrl.equals(outputUrl));
+
+ outputUrl = commonHttpClient.formatHostString(URL + ":" + HTTP_PORT);
+ assertTrue(assertMessage(httpUrl, outputUrl), httpUrl.equals(outputUrl));
+
+ outputUrl = commonHttpClient.formatHostString(URL + ":" + HTTPS_PORT);
+ assertTrue(assertMessage(httpsUrl, outputUrl), httpsUrl.equals(outputUrl));
+
+ outputUrl = commonHttpClient.formatHostString(URL);
+ assertTrue(assertMessage(httpUrl, outputUrl), httpUrl.equals(outputUrl));
+ }
+
+ private String assertMessage(String expected, String actual) {
+ return "Expected: " + expected + " Actual: " + actual;
+ }
+} \ No newline at end of file
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapConsumerImpl.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapConsumerImpl.java
new file mode 100644
index 000000000..ea9a5e921
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapConsumerImpl.java
@@ -0,0 +1,153 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import static javax.ws.rs.core.Response.Status.FORBIDDEN;
+import static javax.ws.rs.core.Response.Status.OK;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import org.apache.http.HttpEntity;
+import org.apache.http.StatusLine;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+
+public class TestHttpDmaapConsumerImpl {
+
+ private static final Collection<String> URLS = Arrays.asList("test.com", "test.org");
+ private static final Collection<String> OUTPUT_MSG = Arrays.asList("FirstMessage", "SecondMessage");
+ private static final String TOPIC_NAME = "Topic";
+ private static final String CONSUMER_NAME = "Consumer";
+ private static final String CONSUMER_ID = "Id";
+ private static final String FILTER = "filter";
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final String MESSAGE_BODY = "[FirstMessage, SecondMessage]";
+ private static final int TIMEOUT_MS = 1000;
+ private static final int LIMIT = 1000;
+
+ @Spy
+ private HttpDmaapConsumerImpl httpDmaapConsumer;
+
+ @Mock
+ private CloseableHttpClient httpClient;
+
+ @Mock
+ private CloseableHttpResponse httpResponse;
+
+ @Mock
+ private StatusLine statusLine;
+
+ @Mock
+ private HttpEntity entity;
+
+ @Before
+ public void setUp() throws Exception {
+ httpDmaapConsumer = new HttpDmaapConsumerImpl(URLS, TOPIC_NAME, CONSUMER_NAME, CONSUMER_ID, FILTER);
+ httpDmaapConsumer.updateCredentials(USERNAME, PASSWORD);
+
+ MockitoAnnotations.initMocks(this);
+ doReturn(httpClient).when(httpDmaapConsumer).getClient();
+ when(httpClient.execute(any(HttpGet.class))).thenReturn(httpResponse);
+ when(httpResponse.getStatusLine()).thenReturn(statusLine);
+ when(httpResponse.getEntity()).thenReturn(entity);
+ doReturn(MESSAGE_BODY).when(httpDmaapConsumer).entityToString(same(entity));
+ }
+
+ @Test
+ public void shouldGetHttpRequest() throws Exception {
+
+ when(statusLine.getStatusCode()).thenReturn(OK.getStatusCode());
+
+ List<String> output = httpDmaapConsumer.fetch();
+
+ assertFalse(output.isEmpty());
+ assertTrue(output.containsAll(OUTPUT_MSG));
+ verify(httpClient).execute(any(HttpGet.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).getEntity();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine, entity);
+ }
+
+ @Test
+ public void shouldNotBeSuccessful_whenHttpResponseIsOtherThanOk() throws Exception {
+
+ when(statusLine.getStatusCode()).thenReturn(FORBIDDEN.getStatusCode());
+
+ List<String> output = httpDmaapConsumer.fetch(TIMEOUT_MS, LIMIT);
+
+ assertTrue(output.isEmpty());
+ verify(httpClient).execute(any(HttpGet.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).getEntity();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine, entity);
+ }
+
+ @Test
+ public void shouldNotBeSuccessful_whenRequestToOneOfUrlsCannotBeSent() throws Exception {
+
+ when(httpClient.execute(any(HttpGet.class))).thenThrow(new ClientProtocolException());
+
+ List<String> output = httpDmaapConsumer.fetch(TIMEOUT_MS, LIMIT);
+
+ assertTrue(output.isEmpty());
+ verify(httpClient).execute(any(HttpGet.class));
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine, entity);
+
+
+ reset(httpClient);
+ when(httpClient.execute(any(HttpGet.class))).thenReturn(httpResponse);
+ when(statusLine.getStatusCode()).thenReturn(OK.getStatusCode());
+
+ output = httpDmaapConsumer.fetch(TIMEOUT_MS, LIMIT);
+
+ assertFalse(output.isEmpty());
+ assertTrue(output.containsAll(OUTPUT_MSG));
+ verify(httpClient).execute(any(HttpGet.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).getEntity();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine, entity);
+ }
+
+} \ No newline at end of file
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapProducerImpl.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapProducerImpl.java
new file mode 100644
index 000000000..01e41a857
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/http/TestHttpDmaapProducerImpl.java
@@ -0,0 +1,135 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.http;
+
+import static javax.ws.rs.core.Response.Status.FORBIDDEN;
+import static javax.ws.rs.core.Response.Status.OK;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.http.StatusLine;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+
+public class TestHttpDmaapProducerImpl {
+
+ private static final Collection<String> URLS = Arrays.asList("test.com", "test.org");
+ private static final String TOPIC_NAME = "Topic";
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final String PARTITION = "partition";
+ private static final String DATA = "data";
+
+
+ @Spy
+ private HttpDmaapProducerImpl httpDmaapProducer = new HttpDmaapProducerImpl();
+
+ @Mock
+ private CloseableHttpClient httpClient;
+
+ @Mock
+ private CloseableHttpResponse httpResponse;
+
+ @Mock
+ private StatusLine statusLine;
+
+ @Before
+ public void setUp() throws Exception {
+ httpDmaapProducer = new HttpDmaapProducerImpl(URLS, TOPIC_NAME);
+ httpDmaapProducer.updateCredentials(USERNAME, PASSWORD);
+
+ MockitoAnnotations.initMocks(this);
+ doReturn(httpClient).when(httpDmaapProducer).getClient();
+ when(httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
+ when(httpResponse.getStatusLine()).thenReturn(statusLine);
+ }
+
+ @Test
+ public void shouldPostHttpRequest() throws Exception {
+
+ when(statusLine.getStatusCode()).thenReturn(OK.getStatusCode());
+
+ boolean successful = httpDmaapProducer.post(PARTITION, DATA);
+
+ assertTrue(successful);
+ verify(httpClient).execute(any(HttpPost.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine);
+ }
+
+ @Test
+ public void shouldNotBeSuccessful_whenHttpResponseIsOtherThanOk() throws Exception {
+
+ when(statusLine.getStatusCode()).thenReturn(FORBIDDEN.getStatusCode());
+
+ boolean successful = httpDmaapProducer.post(PARTITION, DATA);
+
+ assertFalse(successful);
+ verify(httpClient).execute(any(HttpPost.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine);
+ }
+
+ @Test
+ public void shouldNotBeSuccessful_whenRequestToOneOfUrlsCannotBeSent() throws Exception {
+
+ when(httpClient.execute(any(HttpPost.class))).thenThrow(new ClientProtocolException());
+
+ boolean successful = httpDmaapProducer.post(PARTITION, DATA);
+
+ assertFalse(successful);
+ verify(httpClient).execute(any(HttpPost.class));
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine);
+
+
+ reset(httpClient);
+ when(httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
+ when(statusLine.getStatusCode()).thenReturn(OK.getStatusCode());
+
+ successful = httpDmaapProducer.post(PARTITION, DATA);
+
+ assertTrue(successful);
+ verify(httpClient).execute(any(HttpPost.class));
+ verify(httpResponse).getStatusLine();
+ verify(httpResponse).close();
+ verify(statusLine).getStatusCode();
+ verifyNoMoreInteractions(httpClient, httpResponse, statusLine);
+ }
+} \ No newline at end of file
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/utils/TestDmaapUtil.java b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/utils/TestDmaapUtil.java
new file mode 100644
index 000000000..1567242b5
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/java/org/onap/appc/adapter/messaging/dmaap/utils/TestDmaapUtil.java
@@ -0,0 +1,268 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.appc.adapter.messaging.dmaap.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.util.Properties;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.appc.configuration.ConfigurationFactory;
+
+public class TestDmaapUtil {
+ private static Class<?> configurationFactoryClass;
+ private static Field configField;
+
+ @Test
+ public void testCreateConsumerPropFile() {
+ String topic = "JunitTopicOne";
+ Properties junitProps = new Properties();
+ junitProps.put("host", "192.168.10.10");
+ junitProps.put("group", "junit-client");
+ junitProps.put("id", "junit-consumer-one");
+ junitProps.put("filter", "none");
+
+ String junitFile = null;
+
+ // ensure file path property is not set
+ if (System.getProperty(DmaapUtil.DMAAP_PROPERTIES_PATH) != null) {
+ System.clearProperty(DmaapUtil.DMAAP_PROPERTIES_PATH);
+
+ // set configuration to null to force reloading of properties
+ try {
+ configField.set(null, null);
+ } catch (IllegalArgumentException | IllegalAccessException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+ }
+ try {
+ junitFile = DmaapUtil.createConsumerPropFile(topic, junitProps);
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Exception creating consumer property file");
+ }
+
+ assertNotNull(junitFile);
+
+ // open file and verify properties
+ File testFile = new File(junitFile);
+ assertTrue(testFile.exists());
+
+ InputStream is = null;
+ Properties testProps = new Properties();
+ try {
+ is = new FileInputStream(testFile);
+ testProps.load(is);
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ fail("Exception opening consumer property file");
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Exception opening consumer property file");
+ } finally {
+ try {
+ if (is != null) {
+ is.close();
+ }
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ fail("Exception closing consumer property file");
+ }
+ }
+
+ assertFalse(testProps.isEmpty());
+
+ assertEquals(testProps.get("host"), "192.168.10.10");
+ assertEquals(testProps.get("group"), "junit-client");
+ assertEquals(testProps.get("id"), "junit-consumer-one");
+ assertEquals(testProps.get("filter"), "none");
+ assertEquals(testProps.get("TransportType"), "HTTPNOAUTH");
+ }
+
+ @Test
+ public void testCreateConsumerPropFileWithCustomProfile() {
+ String topic = "JunitTopicOne";
+ Properties junitProps = new Properties();
+ junitProps.put("host", "192.168.10.10");
+ junitProps.put("group", "junit-client");
+ junitProps.put("id", "junit-consumer-two");
+ junitProps.put("filter", "none");
+
+ String junitFile = null;
+
+ // set property for DMaaP profile
+ System.setProperty(DmaapUtil.DMAAP_PROPERTIES_PATH, "src/test/resources/org/onap/appc");
+
+ // set configuration to null to force reloading of properties
+ try {
+ configField.set(null, null);
+ } catch (IllegalArgumentException | IllegalAccessException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+
+ try {
+ junitFile = DmaapUtil.createConsumerPropFile(topic, junitProps);
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Exception creating consumer property file");
+ }
+
+ assertNotNull(junitFile);
+
+ // open file and verify properties
+ File testFile = new File(junitFile);
+ assertTrue(testFile.exists());
+
+ InputStream is = null;
+ Properties testProps = new Properties();
+ try {
+ is = new FileInputStream(testFile);
+ testProps.load(is);
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ fail("Exception opening consumer property file");
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Exception opening consumer property file");
+ } finally {
+ try {
+ if (is != null) {
+ is.close();
+ }
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ fail("Exception closing consumer property file");
+ }
+ }
+
+ assertFalse(testProps.isEmpty());
+
+ assertEquals(testProps.get("host"), "192.168.10.10");
+ assertEquals(testProps.get("group"), "junit-client");
+ assertEquals(testProps.get("id"), "junit-consumer-two");
+ assertEquals(testProps.get("filter"), "none");
+ assertEquals(testProps.get("TransportType"), "HTTPAAF");
+ }
+
+ @Test
+ public void testCreateProducerPropFile() {
+ String topic = "JunitTopicOne";
+ Properties junitProps = new Properties();
+ junitProps.put("host", "192.168.10.10");
+ junitProps.put("group", "junit-client");
+ junitProps.put("id", "junit-producer-one");
+ junitProps.put("filter", "none");
+
+ String junitFile = null;
+
+ // ensure file path property is not set
+ if (System.getProperty(DmaapUtil.DMAAP_PROPERTIES_PATH) != null) {
+ System.clearProperty(DmaapUtil.DMAAP_PROPERTIES_PATH);
+
+ // set configuration to null to force reloading of properties
+ try {
+ configField.set(null, null);
+ } catch (IllegalArgumentException | IllegalAccessException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+ }
+
+ try {
+ junitFile = DmaapUtil.createProducerPropFile(topic, junitProps);
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Exception creating consumer property file");
+ }
+
+ assertNotNull(junitFile);
+
+ // open file and verify properties
+ File testFile = new File(junitFile);
+ assertTrue(testFile.exists());
+
+ InputStream is = null;
+ Properties testProps = new Properties();
+ try {
+ is = new FileInputStream(testFile);
+ testProps.load(is);
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ fail("Exception opening consumer property file");
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Exception opening consumer property file");
+ } finally {
+ try {
+ if (is != null) {
+ is.close();
+ }
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ fail("Exception closing consumer property file");
+ }
+ }
+
+ assertFalse(testProps.isEmpty());
+
+ assertEquals(testProps.get("host"), "192.168.10.10");
+ assertEquals(testProps.get("group"), "junit-client");
+ assertEquals(testProps.get("id"), "junit-producer-one");
+ assertEquals(testProps.get("filter"), "none");
+ assertEquals("HTTPNOAUTH", testProps.get("TransportType"));
+ }
+
+ /**
+ * Use reflection to locate fields and methods so that they can be
+ * manipulated during the test to change the internal state accordingly.
+ *
+ * @throws NoSuchFieldException
+ * if the field(s) dont exist
+ * @throws SecurityException
+ * if reflective access is not allowed
+ * @throws NoSuchMethodException
+ * If the method(s) dont exist
+ */
+ @SuppressWarnings("nls")
+ @BeforeClass
+ public static void once() throws NoSuchFieldException, SecurityException, NoSuchMethodException {
+ configurationFactoryClass = ConfigurationFactory.class;
+
+ configField = configurationFactoryClass.getDeclaredField("config");
+ configField.setAccessible(true);
+ }
+}
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/DCAEResponse.txt b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/DCAEResponse.txt
new file mode 100644
index 000000000..884a8525f
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/DCAEResponse.txt
@@ -0,0 +1,20 @@
+{
+"requestTime": "0000-00-00 14:31:19.653787",
+"policyVersion": "13",
+"VMName": "123",
+"from": "test",
+"msgOid": ".1.3.6.1.4.1.193.183.4.1.3.5.1.4",
+"trapID": "1234567",
+"requestClient": "test",
+"message": "Abnormal condition detected",
+"time": "123567890",
+"policyName": "RESTART",
+"trapIDOID": ".1.3.6.1.4.1.193.183.4.1.3.5.1.3",
+"request": "Restart",
+"OPS_CL_timer": "15",
+"nOID": ".1.3.6.1.4.1.193.183.4.2.0.4",
+"AgentAddress": "192.168.1.2",
+"vmOID": ".1.3.6.1.4.1.193.183.4.1.2.1",
+"AICTenantID": "0123456789abcdef0123456789",
+"AICVServerSelfLink": "http://somewhere"
+} \ No newline at end of file
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/consumer.properties b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/consumer.properties
new file mode 100644
index 000000000..709619452
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/consumer.properties
@@ -0,0 +1,54 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END=========================================================
+###
+
+#TransportType-Specify which way user want to use. I.e. <HTTPAAF,DME2,HTTPAUTH >
+TransportType=HTTPAAF
+Latitude =50.000000
+Longitude =-100.000000
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.openecomp.org/events
+Environment =TEST
+Partner=BOT_R
+routeOffer=MR1
+SubContextPath =/
+Protocol =http
+MethodType =GET
+contenttype =application/json
+#authKey=01234567890abcde:01234567890abcdefghijklmn
+#authDate=2016-02-18T13:57:37-0800
+host=127.0.0.1
+topic=org.onap.appc.UNIT-TEST
+group=jmsgrp
+id=2
+timeout=15000
+limit=1000
+filter=
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=preferredRoute.txt
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/default.properties b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/default.properties
new file mode 100644
index 000000000..8dea0a61d
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/default.properties
@@ -0,0 +1,38 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END=========================================================
+###
+
+org.onap.appc.bootstrap.file=test.properties
+org.onap.appc.bootstrap.path=/opt/onap/appc/data/properties,${user.home},.
+
+# Properties commented out below are provided in appc.properties
+poolMembers=10.0.0.1
+#event.pool.members=<DMAAP_IP>:3904
+
+topic.read=APPC-CL
+topic.read.timeout=5
+topic.write=APPC-CL
+event.topic.write=APPC-CL
+client.name=APPC-CLIENT-DMAAP-ADAPTER-TEST
+client.name.id=0
+
+metric.enabled=false;
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/producer.properties b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/producer.properties
new file mode 100644
index 000000000..9cc7f2e55
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/org/onap/appc/producer.properties
@@ -0,0 +1,52 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END=========================================================
+###
+
+#TransportType-Specify which way user want to use. I.e. <HTTPAAF,DME2,HTTPAUTH >
+TransportType=HTTPAAF
+Latitude =50.000000
+Longitude =-100.000000
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.openecomp.org/events
+Environment =TEST
+Partner=BOT_R
+SubContextPath =/
+Protocol =http
+MethodType =POST
+contenttype = application/json
+authKey=01234567890abcde:01234567890abcdefghijklmn
+authDate=2016-07-20T11:30:56-0700
+host=127.0.0.1
+topic=org.onap.appc.UNIT-TEST
+partition=2
+maxBatchSize=100
+maxAgeMs=250
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=preferredRoute.txt
+MessageSentThreadOccurance=50
diff --git a/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/test.properties b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/test.properties
new file mode 100644
index 000000000..a451739bf
--- /dev/null
+++ b/services/appc-dmaap-service/appc-dmaap-adapter-bundle/src/test/resources/test.properties
@@ -0,0 +1,37 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END=========================================================
+###
+
+# Properties commented out below are provided in appc.properties
+#poolMembers=<DMAAP_IP>:3904
+#event.pool.members=<DMAAP_IP>:3904
+
+topic.read=APPC-TEST2
+topic.read.timeout=5
+topic.write=APPC-TEST2
+event.topic.write=APPC-TEST2
+client.name=APPC-CLIENT-DMAAP-ADAPTER-TEST
+client.name.id=0
+#client.key=fakeKey
+#client.secret=fakeSecret
+#event.client.key=fakeKey
+#event.client.secret=fakeSecret