summaryrefslogtreecommitdiffstats
path: root/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java
diff options
context:
space:
mode:
authorPatrick Brady <pb071s@att.com>2017-02-15 23:11:26 -0800
committerPatrick Brady <pb071s@att.com>2017-02-15 23:13:06 -0800
commit1c192d2dd68724e292b6a30f463085a262e1e813 (patch)
treed0e2b3a396e169863cd0efaa835c8675e9d5aaac /appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java
parentc69ba05c7508aa7d7f675189a45c8c87569369ef (diff)
Moving all files to root directory
Change-Id: Ica5535fd6ec85f350fe1640b42137b49f83f10f0 Signed-off-by: Patrick Brady <pb071s@att.com>
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java')
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/AppcDmaapAdapterActivator.java113
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java58
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CommonHttpClient.java105
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java68
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapConsumer.java163
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java26
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapProducer.java133
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java35
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java45
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java46
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java64
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java99
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java56
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/impl/EventSenderImpl.java143
14 files changed, 1154 insertions, 0 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/AppcDmaapAdapterActivator.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/AppcDmaapAdapterActivator.java
new file mode 100644
index 000000000..98b8aa158
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/AppcDmaapAdapterActivator.java
@@ -0,0 +1,113 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.dmaap;
+
+import org.openecomp.appc.configuration.ConfigurationFactory;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+import java.util.Properties;
+
+/**
+ * This activator is used to initialize and terminate the connection pool to one or more providers.
+ * <p>
+ * The CDP abstraction layer supports multiple types of providers, with each provider supporting multiple tenants. The
+ * "connection" to a specific tenant on a specific provider is represented by a "context" object. These context objects
+ * are authenticated to a specific tenant on the provider, but can be reused from one request to another. Contexts are
+ * slow to set up and are resource intensive, so they are cached. However, the contexts for a specific tenant on a
+ * specific provider must be cached separately.
+ * </p>
+ * <p>
+ * Activation of the bundle creates an empty cache which is organized first by provider type, then by tenant name, with
+ * the contents being an empty pool of contexts for that provider/tenant combination. The pool is created on first use,
+ * and retained for as long as the bundle is active.
+ * </p>
+ * <p>
+ * When the bundle is deactivated, the cache is torn down with all contexts being closed.
+ * </p>
+ */
+public class AppcDmaapAdapterActivator implements BundleActivator {
+ private ServiceRegistration registration = null;
+
+ /**
+ * The logger to be used
+ */
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(AppcDmaapAdapterActivator.class);
+
+ /**
+ * Called when this bundle is started so the Framework can perform the bundle-specific activities necessary to start
+ * this bundle. This method can be used to register services or to allocate any resources that this bundle needs.
+ * <p>
+ * This method must complete and return to its caller in a timely manner.
+ * </p>
+ *
+ * @param bundleContext
+ * The execution context of the bundle being started.
+ * @throws java.lang.Exception
+ * If this method throws an exception, this bundle is marked as stopped and the Framework will remove
+ * this bundle's listeners, unregister all services registered by this bundle, and release all services
+ * used by this bundle.
+ * @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext)
+ */
+ @Override
+ public void start(final BundleContext bundleContext) throws Exception {
+ LOG.info("Starting Bundle " + getName());
+
+ /* if (registration == null) {
+ Properties properties = ConfigurationFactory.getConfiguration().getProperties();
+ registration = bundleContext.registerService(EventSender.class, new EventSenderImpl(properties), null);
+ }*/
+ }
+
+ /**
+ * Called when this bundle is stopped so the Framework can perform the bundle-specific activities necessary to stop
+ * the bundle. In general, this method should undo the work that the BundleActivator.start method started. There
+ * should be no active threads that were started by this bundle when this bundle returns. A stopped bundle must not
+ * call any Framework objects.
+ * <p>
+ * This method must complete and return to its caller in a timely manner.
+ * </p>
+ *
+ * @param ctx
+ * The execution context of the bundle being stopped.
+ * @throws java.lang.Exception
+ * If this method throws an exception, the bundle is still marked as stopped, and the Framework will
+ * remove the bundle's listeners, unregister all services registered by the bundle, and release all
+ * services used by the bundle. *
+ * @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext)
+ */
+ @Override
+ public void stop(BundleContext ctx) throws Exception {
+ /*if (this.registration != null) {
+ this.registration.unregister();
+ }*/
+ LOG.info("Stopped Bundle " + getName());
+ }
+
+ public String getName() {
+ return "DMaaP Adapter";
+ }
+
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java
new file mode 100644
index 000000000..7c282911d
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java
@@ -0,0 +1,58 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.dmaap;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+public class CallableConsumer implements Callable<List<String>> {
+
+ private Consumer consumer;
+
+ private int timeout = 15000;
+ private int limit = 1000;
+
+ public CallableConsumer(Consumer c) {
+ this.consumer = c;
+ }
+
+ public CallableConsumer(Consumer c, int waitMs, int fetchSize) {
+ this.consumer = c;
+ this.timeout = waitMs;
+ this.limit = fetchSize;
+ }
+
+ @Override
+ public List<String> call() {
+ return consumer.fetch(timeout, limit);
+ }
+
+ /**
+ * The maximum amount of time to keep a connection alive. Currently is set to waitMs + 10s
+ *
+ * @return An integer representing the maximum amount of time to keep this thread alive
+ */
+ public int getMaxLife() {
+ return 10000 + timeout;
+ }
+
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CommonHttpClient.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CommonHttpClient.java
new file mode 100644
index 000000000..654ec6f7f
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CommonHttpClient.java
@@ -0,0 +1,105 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.dmaap;
+
+import java.net.URI;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.config.RequestConfig.Builder;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+
+public class CommonHttpClient {
+
+ public static final int HTTPS_PORT = 3905;
+
+ private String AUTH_STR;
+
+ protected void setBasicAuth(String username, String password) {
+ if (username != null && password != null) {
+ String plain = String.format("%s:%s", username, password);
+ AUTH_STR = Base64.encodeBase64String(plain.getBytes());
+ } else {
+ AUTH_STR = null;
+ }
+ }
+
+ public HttpGet getReq(URI uri, int timeoutMs) throws Exception {
+ HttpGet out = (uri == null) ? new HttpGet() : new HttpGet(uri);
+ if (AUTH_STR != null) {
+ out.setHeader("Authorization", String.format("Basic %s", AUTH_STR));
+ }
+ out.setConfig(getConfig(timeoutMs));
+ return out;
+ }
+
+ public HttpPost postReq(String url) throws Exception {
+ HttpPost out = (url == null) ? new HttpPost() : new HttpPost(url);
+ if (AUTH_STR != null) {
+ out.setHeader("Authorization", String.format("Basic %s", AUTH_STR));
+ }
+ out.setConfig(getConfig(0));
+ return out;
+ }
+
+ private RequestConfig getConfig(int timeoutMs) {
+ Builder builder = RequestConfig.custom();
+ builder.setSocketTimeout(timeoutMs + 5000);
+ return builder.build();
+ }
+
+ public CloseableHttpClient getClient() {
+ return getClient(false);
+ }
+
+ public CloseableHttpClient getClient(boolean useHttps) {
+ return HttpClientBuilder.create().build();
+ }
+
+ public String formatHostString(String host) {
+ return formatHostString(host, host.contains(String.valueOf(HTTPS_PORT)));
+ }
+
+ public String formatHostString(String host, boolean useHttps) {
+ // Trim trailing slash
+ String out = host.endsWith("/") ? host.substring(0, host.length() - 1) : host;
+
+ boolean hasProto = out.startsWith("http");
+ boolean hasPort = out.contains(":");
+
+ // Add protocol
+ if (!hasProto) {
+ out = String.format("%s%s", (useHttps) ? "https://" : "http://", out);
+ }
+
+ // Add port
+ if (!hasPort) {
+ out = String.format("%s:%d", out, (useHttps) ? 3905 : 3904);
+ }
+
+ return out;
+
+ }
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java
new file mode 100644
index 000000000..32034e5fb
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java
@@ -0,0 +1,68 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.dmaap;
+
+import java.util.List;
+
+public interface Consumer {
+
+ /**
+ * Gets a batch of messages from the topic. Defaults to 1000 messages with 15s wait for messages if empty.
+ *
+ * @return A list of strings representing the messages pulled from the topic.
+ */
+ public List<String> fetch();
+
+ /**
+ * Gets a batch of messages from the topic.
+ *
+ * @param waitMs
+ * The amount of time to wait in milliseconds if the topic is empty for data to be written. Should be no
+ * less than 15000ms to prevent too many requests
+ * @param limit
+ * The amount of messages to fetch
+ * @return A list of strings representing the messages pulled from the topic.
+ */
+ public List<String> fetch(int waitMs, int limit);
+
+ /**
+ * Updates the api credentials for making authenticated requests
+ *
+ * @param apiKey
+ * The public key to authenticate with
+ * @param apiSecret
+ * The secret key to authenticate with
+ */
+ public void updateCredentials(String apiKey, String apiSecret);
+
+ // TODO - Implement once Cambria allows you to set outside of constructor
+ // public void setFilter(String filter);
+
+ /**
+ * Creates a dmaap client using a https connection
+ *
+ * @param yes
+ * True if https should be used, false otherwise
+ */
+ public void useHttps(boolean yes);
+
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapConsumer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapConsumer.java
new file mode 100644
index 000000000..6e16d896b
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapConsumer.java
@@ -0,0 +1,163 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.dmaap;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+import org.json.JSONArray;
+
+public class DmaapConsumer extends CommonHttpClient implements Consumer {
+
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumer.class);
+
+ // Default values
+ private static final int DEFAULT_TIMEOUT_MS = 15000;
+ private static final int DEFAULT_LIMIT = 1000;
+ private static final String HTTPS_PORT = ":3905";
+ private static final String URL_TEMPLATE = "%s/events/%s/%s/%s";
+
+ private List<String> urls;
+ private String filter;
+
+ private boolean useHttps = false;
+
+ public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId) {
+ this(hosts, topicName, consumerName, consumerId, null);
+ }
+
+ public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId,
+ String filter) {
+ this(hosts, topicName, consumerName, consumerId, filter, null, null);
+ }
+
+ public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId,
+ String filter, String user, String password) {
+ urls = new ArrayList<String>();
+ for (String host : hosts) {
+ urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId));
+ }
+ this.filter = filter;
+ updateCredentials(user, password);
+ }
+
+ @Override
+ public void updateCredentials(String user, String pass) {
+ LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
+ this.setBasicAuth(user, pass);
+ }
+
+ @Override
+ public List<String> fetch(int waitMs, int limit) {
+ LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
+ List<String> out = new ArrayList<String>();
+ try {
+ List<NameValuePair> urlParams = new ArrayList<NameValuePair>();
+ urlParams.add(new BasicNameValuePair("timeout", String.valueOf(waitMs)));
+ urlParams.add(new BasicNameValuePair("limit", String.valueOf(limit)));
+ if (filter != null) {
+ urlParams.add(new BasicNameValuePair("filter", filter));
+ }
+
+ URIBuilder builder = new URIBuilder(urls.get(0));
+ builder.setParameters(urlParams);
+
+ URI uri = builder.build();
+ LOG.info(String.format("GET %s", uri));
+ HttpGet request = getReq(uri, waitMs);
+ CloseableHttpResponse response = getClient().execute(request);
+
+ int httpStatus = response.getStatusLine().getStatusCode();
+ HttpEntity entity = response.getEntity();
+ String body = (entity != null) ? EntityUtils.toString(entity) : null;
+
+ LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
+ (body != null ? body.length() : "null")));
+
+ response.close();
+ if (httpStatus == 200 && body != null) {
+ JSONArray json = new JSONArray(body);
+ LOG.info(String.format("Got %d messages from DMaaP", json.length()));
+ for (int i = 0; i < json.length(); i++) {
+ out.add(json.getString(i));
+ }
+ } else {
+ LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
+ sleep(waitMs);
+ }
+ } catch (Exception e) {
+ if (urls.size() > 1) {
+ String failedUrl = urls.remove(0);
+ urls.add(failedUrl);
+ LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
+ urls.get(0)));
+ }
+ LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
+ sleep(waitMs);
+ }
+
+ return out;
+ }
+
+ @Override
+ public List<String> fetch() {
+ return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
+ }
+
+ @Override
+ public String toString() {
+ String hostStr = (urls == null && !urls.isEmpty()) ? "N/A" : urls.get(0);
+ return String.format("Consumer listening to [%s]", hostStr);
+ }
+
+ @Override
+ public void useHttps(boolean yes) {
+ useHttps = yes;
+ }
+
+ private void sleep(int ms) {
+ LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000));
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e1) {
+ LOG.error("Interrupted while sleeping");
+ }
+ }
+
+ public void close(){
+ //not used yet
+ }
+
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java
new file mode 100644
index 000000000..efbe194ba
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java
@@ -0,0 +1,26 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.dmaap;
+
+public enum DmaapDestination {
+ DCAE
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapProducer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapProducer.java
new file mode 100644
index 000000000..6845177b1
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapProducer.java
@@ -0,0 +1,133 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.dmaap;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+
+public class DmaapProducer extends CommonHttpClient implements Producer {
+
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducer.class);
+
+ private static final String CONTENT_TYPE = "application/cambria";
+ private static final String URL_TEMPLATE = "%s/events/%s";
+
+ private List<String> hosts;
+ private Set<String> topics;
+
+ private boolean useHttps = false;
+
+ public DmaapProducer(Collection<String> urls, String topicName) {
+ hosts = new ArrayList<String>();
+ topics = new HashSet<String>();
+ topics.add(topicName);
+
+ for (String host : urls) {
+ hosts.add(formatHostString(host));
+ }
+ }
+
+ public DmaapProducer(Collection<String> urls, Set<String> topicNames) {
+ hosts = new ArrayList<String>();
+ topics = topicNames;
+
+ for (String host : urls) {
+ hosts.add(formatHostString(host));
+ }
+ }
+
+ @Override
+ public void updateCredentials(String user, String pass) {
+ LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
+ this.setBasicAuth(user, pass);
+ }
+
+ @Override
+ public boolean post(String partition, String data) {
+ int sent = 0;
+ try {
+ HttpPost request = postReq(null);
+ request.setHeader("Content-Type", CONTENT_TYPE);
+ request.setEntity(new StringEntity(bodyLine(partition, data)));
+
+ for (String topic : topics) {
+ String uriStr = String.format(URL_TEMPLATE, hosts.get(0), topic);
+ try {
+ request.setURI(new URI(uriStr));
+ CloseableHttpResponse response = getClient().execute(request);
+ if (response.getStatusLine().getStatusCode() == 200) {
+ sent++;
+ }
+ response.close();
+ } catch (Exception sendEx) {
+ LOG.error(String.format("Failed to send message to %s. Reason: %s", uriStr, sendEx.getMessage()),
+ sendEx);
+ if (hosts.size() > 1) {
+ String failedUrl = hosts.remove(0);
+ hosts.add(failedUrl);
+ LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s",
+ failedUrl, hosts.get(0)));
+ }
+ }
+ }
+ } catch (Exception buildEx) {
+ LOG.error(
+ String.format("Failed to build request with string [%s]. Message not sent to any topic. Reason: %s",
+ data, buildEx.getMessage()),
+ buildEx);
+ }
+ return sent == topics.size();
+ }
+
+ @Override
+ public void useHttps(boolean yes) {
+ useHttps = yes;
+ }
+
+ /**
+ * Format the body for the application/cambria content type with no partitioning.
+ *
+ * @param msg
+ * The message body to format
+ * @return A string in the application/cambria content type
+ */
+ private String bodyLine(String partition, String msg) {
+ String p = (partition == null) ? "" : partition;
+ String m = (msg == null) ? "" : msg;
+ return String.format("%d.%d.%s%s", p.length(), m.length(), p, m);
+ }
+
+ public void close(){
+ //not used yet
+ }
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java
new file mode 100644
index 000000000..7d4a7c090
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java
@@ -0,0 +1,35 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.dmaap;
+
+import java.util.Map;
+
+import org.openecomp.appc.adapter.dmaap.event.EventMessage;
+import org.openecomp.appc.exceptions.APPCException;
+import org.openecomp.sdnc.sli.SvcLogicContext;
+import org.openecomp.sdnc.sli.SvcLogicJavaPlugin;
+
+
+public interface EventSender extends SvcLogicJavaPlugin{
+ boolean sendEvent(DmaapDestination destination, EventMessage msg);
+ boolean sendEvent(DmaapDestination destination, Map<String, String> params, SvcLogicContext ctx) throws APPCException;
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java
new file mode 100644
index 000000000..183e618ba
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java
@@ -0,0 +1,45 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.dmaap;
+
+import java.util.Set;
+
+public interface Manager {
+
+ /**
+ * Updates the api credentials for making authenticated requests
+ *
+ * @param apiKey
+ * The public key to authenticate with
+ * @param apiSecret
+ * The secret key to authenticate with
+ */
+ public void updateCredentials(String apiKey, String apiSecret);
+
+ /**
+ * Return a set of strings representing topics that the user can see
+ *
+ * @return A set of strings with topic names or an empty set if no topics are visible
+ */
+ public Set<String> getTopics();
+
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java
new file mode 100644
index 000000000..f19c516be
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java
@@ -0,0 +1,46 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.dmaap;
+
+public interface Producer {
+
+ public boolean post(String partition, String data);
+
+ /**
+ * Updates the api credentials for making authenticated requests
+ *
+ * @param apiKey
+ * The public key to authenticate with
+ * @param apiSecret
+ * The secret key to authenticate with
+ */
+ public void updateCredentials(String apiKey, String apiSecret);
+
+ /**
+ * Creates a dmaap client using a https connection
+ *
+ * @param yes
+ * True if https should be used, false otherwise
+ */
+ public void useHttps(boolean yes);
+
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java
new file mode 100644
index 000000000..dd951fe37
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java
@@ -0,0 +1,64 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.dmaap.event;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+public class EventHeader {
+
+ @JsonProperty("eventTime")
+ private final String eventTime;
+
+ @JsonProperty("apiVer")
+ private final String apiVer;
+
+ @JsonProperty("eventId")
+ private final String eventId;
+
+ public EventHeader(String eventTime, String apiVer, String eventId) {
+ this.eventTime = eventTime;
+ this.apiVer = apiVer;
+ this.eventId = eventId;
+ }
+
+ public String getEventTime() {
+ return eventTime;
+ }
+
+ public String getApiVer() {
+ return apiVer;
+ }
+
+ public String getEventId() {
+ return eventId;
+ }
+
+ @Override
+ public String toString() {
+ return "EventHeader{" +
+ "eventTime='" + eventTime + '\'' +
+ ", apiVer='" + apiVer + '\'' +
+ ", eventId='" + eventId + '\'' +
+ '}';
+ }
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java
new file mode 100644
index 000000000..af5cff2f9
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java
@@ -0,0 +1,99 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.dmaap.event;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize.Inclusion;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/*
+ {
+ "EventHeader": {
+ "eventTime": "2016-03-15T10:59:33.79Z",
+ "apiVer": "1.01",
+ "EventId": "<ECOMP_EVENT_ID>",
+ },
+ "EventStatus": {
+ "code": "NNN",
+ "reason": "A reason"
+ }
+ }
+*/
+
+
+
+
+@JsonSerialize(include = Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class EventMessage implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @JsonProperty("eventHeader")
+ private EventHeader eventHeader;
+ @JsonProperty("eventStatus")
+ private EventStatus eventStatus;
+
+ public EventMessage(EventHeader eventHeader, EventStatus eventStatus) {
+ this.eventHeader = eventHeader;
+ this.eventStatus = eventStatus;
+ }
+
+ public EventHeader getEventHeader() {
+ return eventHeader;
+ }
+
+ public void setEventHeader(EventHeader eventHeader) {
+ this.eventHeader = eventHeader;
+ }
+
+ public EventStatus getEventStatus() {
+ return eventStatus;
+ }
+
+ public void setEventStatus(EventStatus eventStatus) {
+ this.eventStatus = eventStatus;
+ }
+
+ public String toJson() {
+ try {
+ return OBJECT_MAPPER.writeValueAsString(this);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "EventMessage{" +
+ "eventHeader=" + eventHeader +
+ ", eventStatus=" + eventStatus +
+ '}';
+ }
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java
new file mode 100644
index 000000000..f5d7a59d4
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java
@@ -0,0 +1,56 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.dmaap.event;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+public class EventStatus {
+
+ @JsonProperty("code")
+ private final Integer code;
+
+ @JsonProperty("reason")
+ private final String reason;
+
+ public EventStatus(Integer code, String aReason) {
+ this.code = code;
+ reason = aReason;
+ }
+
+
+ public Integer getCode() {
+ return code;
+ }
+
+ public String getReason() {
+ return reason;
+ }
+
+ @Override
+ public String toString() {
+ return "EventStatus{" +
+ "code=" + code +
+ ", reason='" + reason + '\'' +
+ '}';
+ }
+}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/impl/EventSenderImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/impl/EventSenderImpl.java
new file mode 100644
index 000000000..0f7d40f5b
--- /dev/null
+++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/impl/EventSenderImpl.java
@@ -0,0 +1,143 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : APP-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.adapter.dmaap.impl;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.openecomp.appc.adapter.dmaap.EventSender;
+import org.openecomp.appc.adapter.dmaap.Producer;
+import org.openecomp.appc.adapter.dmaap.DmaapDestination;
+import org.openecomp.appc.adapter.dmaap.event.EventHeader;
+import org.openecomp.appc.adapter.dmaap.event.EventMessage;
+import org.openecomp.appc.adapter.dmaap.event.EventStatus;
+import org.openecomp.appc.adapter.dmaap.DmaapProducer;
+import org.openecomp.appc.configuration.Configuration;
+import org.openecomp.appc.configuration.ConfigurationFactory;
+import org.openecomp.appc.exceptions.APPCException;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.openecomp.sdnc.sli.SvcLogicContext;
+
+
+public class EventSenderImpl implements EventSender
+{
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderImpl.class);
+ public static final String EVENT_TOPIC_WRITE = "event.topic.write";
+ public static final String EVENT_CLIENT_KEY = "event.client.key";
+ public static final String EVENT_CLIENT_SECRET = "event.client.secret";
+ public static final String EVENT_POOL_MEMBERS = "event.pool.members";
+
+ private static Configuration configuration = ConfigurationFactory.getConfiguration();
+
+ private Map<String,Producer> producerMap = new ConcurrentHashMap<>();
+
+ public Map<String, Producer> getProducerMap() {
+ return producerMap;
+ }
+
+ public void setProducerMap(Map<String, Producer> producerMap) {
+ this.producerMap = producerMap;
+ }
+
+ public EventSenderImpl(){
+
+ }
+
+ public void initialize(){
+ Properties properties = configuration.getProperties();
+ String writeTopic;
+ String apiKey;
+ String apiSecret;
+ final List<String> pool = new ArrayList<>();
+
+ for(DmaapDestination destination:DmaapDestination.values()){
+ writeTopic = properties.getProperty(destination + "." + EVENT_TOPIC_WRITE);
+ apiKey = properties.getProperty(destination + "." + EVENT_CLIENT_KEY);
+ apiSecret = properties.getProperty(destination + "." + EVENT_CLIENT_SECRET);
+ String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS);
+
+ if (hostNames != null && !hostNames.isEmpty()) {
+ LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS));
+ Collections.addAll(pool, hostNames.split(","));
+ }
+
+ LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS));
+ LOG.debug(String.format("writeTopic = %s, taken from property: %s", writeTopic, destination + "." + EVENT_TOPIC_WRITE));
+ LOG.debug(String.format("apiKey = %s, taken from property: %s", apiKey, destination + "." + EVENT_CLIENT_KEY));
+ Producer producer = new DmaapProducer(pool, writeTopic);
+
+ if (apiKey != null && apiSecret != null) {
+ producer.updateCredentials(apiKey, apiSecret);
+ }
+
+ for (String url : pool) {
+ if (url.contains("3905") || url.contains("https")) {
+ LOG.debug("Producer should use HTTPS");
+ producer.useHttps(true);
+ break;
+ }
+ }
+ producerMap.put(destination.toString(),producer);
+ }
+
+ }
+
+ @Override
+ public boolean sendEvent(DmaapDestination destination,EventMessage msg) {
+ String jsonStr = msg.toJson();
+ String id = msg.getEventHeader().getEventId();
+ LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr));
+ Producer producer = producerMap.get(destination.toString());
+ return producer.post(id, jsonStr);
+ }
+
+ @Override
+ public boolean sendEvent(DmaapDestination destination,Map<String, String> params, SvcLogicContext ctx) throws APPCException {
+
+ if (params == null) {
+ String message = "Parameters map is empty (null)";
+ LOG.error(message);
+ throw new APPCException(message);
+ }
+ String eventTime = new Date(System.currentTimeMillis()).toString();
+ String apiVer = params.get("apiVer");
+ String eventId = params.get("eventId");
+ String reason = params.get("reason");
+ String entityId=params.get("entityId");
+ if(entityId!=null){
+ reason=reason+"("+entityId+")";
+ }
+ Integer code = Integer.getInteger(params.get("code"), 500);
+
+ if (eventTime == null || apiVer == null || eventId == null || reason == null) {
+ String message = String.format("Missing input parameters: %s", params);
+ LOG.error(message);
+ throw new APPCException(message);
+ }
+ EventMessage dmaapEventMessage = new EventMessage(
+ new EventHeader(eventTime, apiVer, eventId),
+ new EventStatus(code, reason));
+
+ return sendEvent(destination,dmaapEventMessage);
+ }
+}