aboutsummaryrefslogtreecommitdiffstats
path: root/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp
diff options
context:
space:
mode:
authorPatrick Brady <pb071s@att.com>2017-12-13 11:14:21 -0800
committerPatrick Brady <pb071s@att.com>2017-12-13 11:14:31 -0800
commit161df8a94bb3b0c34ed16fd4fdba078bd1eeef9a (patch)
tree21f410d053d5e390902d72bb3ebe6889451dd633 /appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp
parent0756759f39e125b02d63b4e93de83b3c6b13beea (diff)
Second part of onap rename
This is the second commit of the rename. The folder structure is renamed for appc-adapters and appc-config in this commit. Change-Id: Iaa2b8c937ff1ca1b5d1178128961fb115ee65d9b Signed-off-by: Patrick Brady <pb071s@att.com> Issue-ID: APPC-13
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp')
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java106
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java108
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java167
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java138
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java234
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java223
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java86
-rw-r--r--appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java178
8 files changed, 0 insertions, 1240 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java
deleted file mode 100644
index 812f80121..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.adapter.messaging.dmaap;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
-
-
-/**
- * This activator is used to initialize and terminate the connection pool to one or more providers.
- * <p>
- * The CDP abstraction layer supports multiple types of providers, with each provider supporting multiple tenants. The
- * "connection" to a specific tenant on a specific provider is represented by a "context" object. These context objects
- * are authenticated to a specific tenant on the provider, but can be reused from one request to another. Contexts are
- * slow to set up and are resource intensive, so they are cached. However, the contexts for a specific tenant on a
- * specific provider must be cached separately.
- * </p>
- * <p>
- * Activation of the bundle creates an empty cache which is organized first by provider type, then by tenant name, with
- * the contents being an empty pool of contexts for that provider/tenant combination. The pool is created on first use,
- * and retained for as long as the bundle is active.
- * </p>
- * <p>
- * When the bundle is deactivated, the cache is torn down with all contexts being closed.
- * </p>
- */
-public class AppcDmaapAdapterActivator implements BundleActivator {
- private ServiceRegistration registration = null;
-
- /**
- * The logger to be used
- */
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(AppcDmaapAdapterActivator.class);
-
- /**
- * Called when this bundle is started so the Framework can perform the bundle-specific activities necessary to start
- * this bundle. This method can be used to register services or to allocate any resources that this bundle needs.
- * <p>
- * This method must complete and return to its caller in a timely manner.
- * </p>
- *
- * @param bundleContext
- * The execution context of the bundle being started.
- * @throws java.lang.Exception
- * If this method throws an exception, this bundle is marked as stopped and the Framework will remove
- * this bundle's listeners, unregister all services registered by this bundle, and release all services
- * used by this bundle.
- * @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext)
- */
- @Override
- public void start(final BundleContext bundleContext) throws Exception {
- LOG.info("Starting Bundle " + getName());
- }
-
- /**
- * Called when this bundle is stopped so the Framework can perform the bundle-specific activities necessary to stop
- * the bundle. In general, this method should undo the work that the BundleActivator.start method started. There
- * should be no active threads that were started by this bundle when this bundle returns. A stopped bundle must not
- * call any Framework objects.
- * <p>
- * This method must complete and return to its caller in a timely manner.
- * </p>
- *
- * @param ctx
- * The execution context of the bundle being stopped.
- * @throws java.lang.Exception
- * If this method throws an exception, the bundle is still marked as stopped, and the Framework will
- * remove the bundle's listeners, unregister all services registered by the bundle, and release all
- * services used by the bundle. *
- * @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext)
- */
- @Override
- public void stop(BundleContext ctx) throws Exception {
- LOG.info("Stopped Bundle " + getName());
- }
-
- public String getName() {
- return "DMaaP Adapter";
- }
-
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
deleted file mode 100644
index 76b050d8e..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.adapter.messaging.dmaap.http;
-
-import java.net.URI;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.config.RequestConfig.Builder;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-
-public class CommonHttpClient {
-
- public static final int HTTPS_PORT = 3905;
-
- private String AUTH_STR;
-
- protected void setBasicAuth(String username, String password) {
- if (username != null && password != null) {
- String plain = String.format("%s:%s", username, password);
- AUTH_STR = Base64.encodeBase64String(plain.getBytes());
- } else {
- AUTH_STR = null;
- }
- }
-
- public HttpGet getReq(URI uri, int timeoutMs) throws Exception {
- HttpGet out = (uri == null) ? new HttpGet() : new HttpGet(uri);
- if (AUTH_STR != null) {
- out.setHeader("Authorization", String.format("Basic %s", AUTH_STR));
- }
- out.setConfig(getConfig(timeoutMs));
- return out;
- }
-
- public HttpPost postReq(String url) throws Exception {
- HttpPost out = (url == null) ? new HttpPost() : new HttpPost(url);
- if (AUTH_STR != null) {
- out.setHeader("Authorization", String.format("Basic %s", AUTH_STR));
- }
- out.setConfig(getConfig(0));
- return out;
- }
-
- private RequestConfig getConfig(int timeoutMs) {
- Builder builder = RequestConfig.custom();
- builder.setSocketTimeout(timeoutMs + 5000);
- return builder.build();
- }
-
- public CloseableHttpClient getClient() {
- return getClient(false);
- }
-
- public CloseableHttpClient getClient(boolean useHttps) {
- return HttpClientBuilder.create().build();
- }
-
- public String formatHostString(String host) {
- return formatHostString(host, host.contains(String.valueOf(HTTPS_PORT)));
- }
-
- public String formatHostString(String host, boolean useHttps) {
- // Trim trailing slash
- String out = host.endsWith("/") ? host.substring(0, host.length() - 1) : host;
-
- boolean hasProto = out.startsWith("http");
- boolean hasPort = out.contains(":");
-
- // Add protocol
- if (!hasProto) {
- out = String.format("%s%s", (useHttps) ? "https://" : "http://", out);
- }
-
- // Add port
- if (!hasPort) {
- out = String.format("%s:%d", out, (useHttps) ? 3905 : 3904);
- }
-
- return out;
-
- }
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
deleted file mode 100644
index df81e9718..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.adapter.messaging.dmaap.http;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-import org.apache.http.HttpEntity;
-import org.apache.http.NameValuePair;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.message.BasicNameValuePair;
-import org.apache.http.util.EntityUtils;
-import org.json.JSONArray;
-import org.onap.appc.adapter.message.Consumer;
-
-public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer {
-
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
-
- // Default values
- private static final int DEFAULT_TIMEOUT_MS = 15000;
- private static final int DEFAULT_LIMIT = 1000;
- private static final String HTTPS_PORT = ":3905";
- private static final String URL_TEMPLATE = "%s/events/%s/%s/%s";
-
- private List<String> urls;
- private String filter;
-
- private boolean useHttps = false;
-
- public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId) {
- this(hosts, topicName, consumerName, consumerId, null);
- }
-
- public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
- String filter) {
- this(hosts, topicName, consumerName, consumerId, filter, null, null);
- }
-
- public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
- String filter, String user, String password) {
- urls = new ArrayList<String>();
- for (String host : hosts) {
- urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId));
- }
- this.filter = filter;
- updateCredentials(user, password);
- }
-
- @Override
- public void updateCredentials(String user, String pass) {
- LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
- this.setBasicAuth(user, pass);
- }
-
- @Override
- public List<String> fetch(int waitMs, int limit) {
- LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
- List<String> out = new ArrayList<String>();
- try {
- List<NameValuePair> urlParams = new ArrayList<NameValuePair>();
- urlParams.add(new BasicNameValuePair("timeout", String.valueOf(waitMs)));
- urlParams.add(new BasicNameValuePair("limit", String.valueOf(limit)));
- if (filter != null) {
- urlParams.add(new BasicNameValuePair("filter", filter));
- }
-
- URIBuilder builder = new URIBuilder(urls.get(0));
- builder.setParameters(urlParams);
-
- URI uri = builder.build();
- LOG.info(String.format("GET %s", uri));
- HttpGet request = getReq(uri, waitMs);
- CloseableHttpResponse response = getClient().execute(request);
-
- int httpStatus = response.getStatusLine().getStatusCode();
- HttpEntity entity = response.getEntity();
- String body = (entity != null) ? EntityUtils.toString(entity) : null;
-
- LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
- (body != null ? body.length() : "null")));
-
- response.close();
- if (httpStatus == 200 && body != null) {
- JSONArray json = new JSONArray(body);
- LOG.info(String.format("Got %d messages from DMaaP", json.length()));
- for (int i = 0; i < json.length(); i++) {
- out.add(json.getString(i));
- }
- } else {
- LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
- sleep(waitMs);
- }
- } catch (Exception e) {
- if (urls.size() > 1) {
- String failedUrl = urls.remove(0);
- urls.add(failedUrl);
- LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
- urls.get(0)));
- }
- LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
- sleep(waitMs);
- }
-
- return out;
- }
-
- @Override
- public List<String> fetch() {
- return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
- }
-
- @Override
- public String toString() {
- String hostStr = (urls == null || urls.isEmpty()) ? "N/A" : urls.get(0);
- return String.format("Consumer listening to [%s]", hostStr);
- }
-
- @Override
- public void useHttps(boolean yes) {
- useHttps = yes;
- }
-
- private void sleep(int ms) {
- LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000));
- try {
- Thread.sleep(ms);
- } catch (InterruptedException e1) {
- LOG.error("Interrupted while sleeping");
- }
- }
-
- @Override
- public void close() {
- // Nothing to do
- }
-
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
deleted file mode 100644
index 560c09be4..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.adapter.messaging.dmaap.http;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.onap.appc.adapter.message.Producer;
-
-public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer {
-
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class);
-
- private static final String CONTENT_TYPE = "application/cambria";
- private static final String URL_TEMPLATE = "%s/events/%s";
-
- private List<String> hosts;
- private Set<String> topics;
-
- private boolean useHttps = false;
-
- public HttpDmaapProducerImpl(Collection<String> urls, String topicName) {
- hosts = new ArrayList<String>();
- topics = new HashSet<String>();
- topics.add(topicName);
-
- for (String host : urls) {
- hosts.add(formatHostString(host));
- }
- }
-
- public HttpDmaapProducerImpl(Collection<String> urls, Set<String> topicNames) {
- hosts = new ArrayList<String>();
- topics = topicNames;
-
- for (String host : urls) {
- hosts.add(formatHostString(host));
- }
- }
-
- @Override
- public void updateCredentials(String user, String pass) {
- LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
- this.setBasicAuth(user, pass);
- }
-
- @Override
- public boolean post(String partition, String data) {
- int sent = 0;
- try {
- HttpPost request = postReq(null);
- request.setHeader("Content-Type", CONTENT_TYPE);
- request.setEntity(new StringEntity(bodyLine(partition, data)));
-
- for (String topic : topics) {
- String uriStr = String.format(URL_TEMPLATE, hosts.get(0), topic);
- try {
- request.setURI(new URI(uriStr));
- CloseableHttpResponse response = getClient().execute(request);
- if (response.getStatusLine().getStatusCode() == 200) {
- sent++;
- }
- response.close();
- } catch (Exception sendEx) {
- LOG.error(String.format("Failed to send message to %s. Reason: %s", uriStr, sendEx.getMessage()),
- sendEx);
- if (hosts.size() > 1) {
- String failedUrl = hosts.remove(0);
- hosts.add(failedUrl);
- LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s",
- failedUrl, hosts.get(0)));
- }
- }
- }
- } catch (Exception buildEx) {
- LOG.error(
- String.format("Failed to build request with string [%s]. Message not sent to any topic. Reason: %s",
- data, buildEx.getMessage()),
- buildEx);
- }
- return sent == topics.size();
- }
-
- @Override
- public void useHttps(boolean yes) {
- useHttps = yes;
- }
-
- /**
- * Format the body for the application/cambria content type with no partitioning. See
- *
- * @param msg
- * The message body to format
- * @return A string in the application/cambria content type
- */
- private String bodyLine(String partition, String msg) {
- String p = (partition == null) ? "" : partition;
- String m = (msg == null) ? "" : msg;
- return String.format("%d.%d.%s%s", p.length(), m.length(), p, m);
- }
-
- @Override
- public void close() {
- // Nothing to do
- }
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java
deleted file mode 100644
index ddf630545..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.adapter.messaging.dmaap.impl;
-
-import java.io.IOException;
-import java.util.*;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-//import com.att.nsa.cambria.client.CambriaClientBuilders;
-//import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-//import com.att.nsa.cambria.client.CambriaConsumer;
-
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.appc.adapter.message.Consumer;
-import org.onap.appc.configuration.Configuration;
-import org.onap.appc.configuration.ConfigurationFactory;
-import org.onap.appc.metricservice.MetricRegistry;
-import org.onap.appc.metricservice.MetricService;
-import org.onap.appc.metricservice.impl.MetricServiceImpl;
-import org.onap.appc.metricservice.metric.Metric;
-import org.onap.appc.metricservice.metric.MetricType;
-import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric;
-import org.onap.appc.metricservice.policy.PublishingPolicy;
-import org.onap.appc.metricservice.publisher.LogPublisher;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.FrameworkUtil;
-import org.osgi.framework.ServiceReference;
-
-public class DmaapConsumerImpl implements Consumer {
-
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
- private static final Configuration configuration = ConfigurationFactory.getConfiguration();
- // Default values
- private static final int DEFAULT_TIMEOUT_MS = 60000;
- private static final int DEFAULT_LIMIT = 1000;
- private static MetricRegistry metricRegistry;
- private String topic;
- private DmaapRequestCounterMetric dmaapKpiMetric;
- private boolean isMetricEnabled=false;
- private boolean useHttps = false;
- private MRConsumer client = null;
- private Properties props = null;
-
-
- public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password) {
- this(urls, topicName, consumerGroupName, consumerId,user, password,null);
-
- }
-
- public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password,String filter) {
- this.topic = topicName;
- this.props = new Properties();
- String urlsStr = StringUtils.join(urls, ',');
- props.setProperty("host",urlsStr);
- props.setProperty("group",consumerGroupName);
- props.setProperty("id",consumerId);
- props.setProperty("username",user);
- props.setProperty("password",password);
- if(filter != null) {
- props.setProperty("filter", filter);
- }
- }
-
-
- private void initMetric() {
- LOG.debug("Metric getting initialized");
- MetricService metricService = getMetricservice();
- metricRegistry = metricService.createRegistry("APPC");
- dmaapKpiMetric = metricRegistry.metricBuilderFactory().
- dmaapRequestCounterBuilder().
- withName("DMAAP_KPI").withType(MetricType.COUNTER).
- withRecievedMessage(0)
- .withPublishedMessage(0)
- .build();
- if (metricRegistry.register(dmaapKpiMetric)) {
- Metric[] metrics = new Metric[]{dmaapKpiMetric};
- LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
- LogPublisher[] logPublishers = new LogPublisher[1];
- logPublishers[0] = logPublisher;
- PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory().
- scheduledPolicyBuilder().withPublishers(logPublishers).
- withMetrics(metrics).
- build();
- LOG.debug("Policy getting initialized");
- manuallyScheduledPublishingPolicy.init();
- LOG.debug("Metric initialized");
- }
- }
- private MRConsumer getClient() {
- return getClient(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
- }
-
- /**
- * @return An instance of MRConsumer created from our class variables
- */
- private synchronized MRConsumer getClient(int waitMs, int limit) {
- try {
- props.setProperty("timeout",String.valueOf(waitMs));
- props.setProperty("limit",String.valueOf(limit));
- String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic,props);
- return MRClientFactory.createConsumer ( topicProducerPropFileName);
- } catch (IOException e1) {
- LOG.error("failed to createConsumer",e1);
- return null;
- }
- }
-
- @Override
- public synchronized void updateCredentials(String key, String secret) {
- LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
- String user = key;
- String password = secret;
- props.setProperty("user",String.valueOf(user));
- props.setProperty("password",String.valueOf(password));
- client = null;
- }
-
- @Override
- public List<String> fetch(int waitMs, int limit) {
- Properties properties=configuration.getProperties();
- if(properties!=null && properties.getProperty("metric.enabled")!=null ){
- isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled"));
- }
- if(isMetricEnabled){
- initMetric();
- }
- LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
- List<String> out = new ArrayList<String>();
-
- // Create client once and reuse it on subsequent fetches. This is
- // to support failover to other servers in the DMaaP cluster.
- if (client == null) {
- LOG.info("Getting DMaaP Client ...");
- client = getClient(waitMs, limit);
- }
- try {
- for (String s : client.fetch(waitMs, limit)) {
- out.add(s);
- if(isMetricEnabled){
- ((DmaapRequestCounterMetric)metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage();
- }
- }
- LOG.debug(String.format("Got %d records from %s", out.size(), this.toString()));
- } catch (Exception e) {
- // Connection exception
- LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage()));
- e.printStackTrace();
- try {
- LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs));
- Thread.sleep(waitMs);
- } catch (InterruptedException e2) {
- LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs));
- }
- }
-
-
- return out;
- }
-
- /**
- * Close consumer Dmaap client
- */
- @Override
- public void close() {
- LOG.debug("Closing Dmaap consumer client....");
- if (client != null) {
- client.close();
- }
- }
-
- @Override
- public List<String> fetch() {
- return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
- }
-
- @Override
- public String toString() {
- String hostStr = (props == null || props.getProperty("host") == null? "N/A" : props.getProperty("host"));
- String group = (props == null || props.getProperty("group") == null? "N/A" : props.getProperty("group"));
- String id = (props == null || props.getProperty("id") == null? "N/A" : props.getProperty("id"));
- return String.format("Consumer %s/%s listening to %s on [%s]", group, id, topic, hostStr);
- }
-
- @Override
- public void useHttps(boolean yes) {
- useHttps = yes;
- }
-
-
- private MetricService getMetricservice() {
- BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext();
- // Get AAIadapter reference
- ServiceReference sref = bctx.getServiceReference(MetricService.class.getName());
- if (sref != null) {
- LOG.info("Metric Service from bundlecontext");
- return (MetricServiceImpl) bctx.getService(sref);
-
- } else {
- LOG.info("Metric Service error from bundlecontext");
- LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService");
- return null;
-
- }
- }
-
- public Metric getMetric(String name){
- return metricRegistry.metric(name);
- }
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java
deleted file mode 100644
index 7ed06a9e3..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.adapter.messaging.dmaap.impl;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-//import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-//import com.att.nsa.cambria.client.CambriaClientBuilders;
-//import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
-
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.MRClientFactory;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.appc.adapter.message.Producer;
-import org.onap.appc.adapter.messaging.dmaap.impl.DmaapUtil;
-import org.onap.appc.configuration.Configuration;
-import org.onap.appc.configuration.ConfigurationFactory;
-import org.onap.appc.metricservice.MetricRegistry;
-import org.onap.appc.metricservice.MetricService;
-import org.onap.appc.metricservice.metric.Metric;
-import org.onap.appc.metricservice.metric.MetricType;
-import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric;
-import org.onap.appc.metricservice.policy.PublishingPolicy;
-import org.onap.appc.metricservice.publisher.LogPublisher;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.FrameworkUtil;
-import org.osgi.framework.ServiceReference;
-
-public class DmaapProducerImpl implements Producer {
-
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class);
- private static final Configuration configuration = ConfigurationFactory.getConfiguration();
-
- private Set<String> topics = new HashSet<String>();
-
- private Properties props = null;
- private static MetricRegistry metricRegistry;
- private boolean useHttps = false;
- private DmaapRequestCounterMetric dmaapKpiMetric;
- private boolean isMetricEnabled=false;
-
- private Set<MRBatchingPublisher> clients;
-
-
- public DmaapProducerImpl(Collection<String> urls, String topicName, String user, String password) {
- this(urls, (Set<String>)null, user, password);
- this.topics = new HashSet<>();
- if (topicName != null) {
- for (String topic : topicName.split(",")) {
- topics.add(topic);
- }
- }
- }
-
- public DmaapProducerImpl(Collection<String> urls, Set<String> topicNames, String user, String password) {
- topics = topicNames;
- if(urls == null || user == null || password == null){
- throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password" );
- }
- this.props = new Properties();
- String urlsStr = StringUtils.join(urls, ',');
- props.setProperty("host",urlsStr);
- props.setProperty("id", UUID.randomUUID().toString());
- props.setProperty("username",user);
- props.setProperty("password",password);
- }
- private void initMetric() {
- LOG.debug("Metric getting initialized");
- MetricService metricService = getMetricservice();
- metricRegistry=metricService.createRegistry("APPC");
- dmaapKpiMetric = metricRegistry.metricBuilderFactory().
- dmaapRequestCounterBuilder().
- withName("DMAAP_KPI").withType(MetricType.COUNTER).
- withRecievedMessage(0)
- .withPublishedMessage(0)
- .build();
- if(metricRegistry.register(dmaapKpiMetric)) {
- Metric[] metrics = new Metric[]{dmaapKpiMetric};
- LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
- LogPublisher[] logPublishers = new LogPublisher[1];
- logPublishers[0] = logPublisher;
- PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory().
- scheduledPolicyBuilder().withPublishers(logPublishers).
- withMetrics(metrics).
- build();
- LOG.debug("Policy getting initialized");
- manuallyScheduledPublishingPolicy.init();
- LOG.debug("Metric initialized");
- }
-
- }
- private Set<MRBatchingPublisher> getClients() {
- Set<MRBatchingPublisher> out = new HashSet<MRBatchingPublisher>();
- for (String topic : topics) {
- try {
- String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props);
- final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher (topicProducerPropFileName);
- out.add(client);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- return out;
- }
-
- @Override
- public synchronized void updateCredentials(String key, String secret) {
- LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
- String user = key;
- String password = secret;
- props.setProperty("user",String.valueOf(user));
- props.setProperty("password",String.valueOf(password));
- clients = null;
- }
-
- @Override
- public boolean post(String partition, String data) {
- boolean success = true;
- Properties properties=configuration.getProperties();
- if(properties!=null && properties.getProperty("metric.enabled")!=null ){
- isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled"));
- }
- if(isMetricEnabled){
- initMetric();
- }
-
- // Create clients once and reuse them on subsequent posts. This is
- // to support failover to other servers in the Dmaap cluster.
- if ((clients == null) || (clients.isEmpty())) {
- LOG.info("Getting CambriaBatchingPublisher Clients ...");
- clients = getClients();
- }
-
- for (MRBatchingPublisher client : clients) {
- try {
- LOG.debug(String.format("Posting %s to %s", data, client));
- client.send(partition, data);
- } catch (IOException e) {
- e.printStackTrace();
- success = false;
- }
- }
- if(isMetricEnabled){
- ( (DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage();
- }
- return success;
- }
-
- /**
- * Close producer Dmaap client
- */
- @Override
- public void close() {
- if ((clients == null) || (clients.isEmpty())) {
- return;
- }
-
- LOG.debug("Closing Dmaap producer clients....");
- for (MRBatchingPublisher client : clients) {
- try {
- client.close(1, TimeUnit.SECONDS);
- } catch (IOException | InterruptedException e) {
- LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client));
- e.printStackTrace();
- }
- }
- }
-
- @Override
- public void useHttps(boolean yes) {
- useHttps = yes;
- }
-
- private MetricService getMetricservice() {
-/*
- return AppcDmaapAdapterActivator.getMetricService();
-*/
-
- BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext();
- ServiceReference sref = bctx.getServiceReference(MetricService.class.getName());
- if (sref != null) {
- LOG.info("Metric Service from bundlecontext");
- return (MetricService) bctx.getService(sref);
-
- } else {
- LOG.info("Metric Service error from bundlecontext");
- LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService");
- return null;
-
- }
- }
-
- public Metric getMetric(String name){
- return metricRegistry.metric(name);
- }
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java
deleted file mode 100644
index da1b0fa38..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.adapter.messaging.dmaap.impl;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Properties;
-
-public class DmaapUtil {
- private final static String delimiter = "_";
- private static String createPreferredRouteFileIfNotExist(String topic) throws IOException {
- String topicPreferredRouteFileName = null;
- topicPreferredRouteFileName = topic+"preferredRoute.properties";
- File fo= new File(topicPreferredRouteFileName);
- if(!fo.exists()) {
- ClassLoader classLoader = DmaapUtil.class.getClassLoader();
- InputStream inputStream = classLoader.getResourceAsStream("preferredRoute.txt");
- Properties props = new Properties();
- props.load(inputStream);
- String fileName = topic != null ? topic+delimiter+"MR1" : delimiter+"MR1";
- props.setProperty("preferredRouteKey", fileName);
- topicPreferredRouteFileName = topic + "preferredRoute.properties";
- props.store(new FileOutputStream(topicPreferredRouteFileName), "preferredRoute.properties file created on the fly for topic:" + topic + " on:" + System.currentTimeMillis());
- }
- return topicPreferredRouteFileName;
- }
-
- public static String createConsumerPropFile(String topic, Properties props)throws IOException {
- String defaultProfFileName = "consumer.properties";
- String topicConsumerPropFileName = createConsumerProducerPropFile(topic, defaultProfFileName,props);
- return topicConsumerPropFileName;
- }
-
- public static String createProducerPropFile(String topic, Properties props)throws IOException {
- String defaultProfFileName = "producer.properties";
- String topicConsumerPropFileName = createConsumerProducerPropFile(topic, defaultProfFileName,props);
- return topicConsumerPropFileName;
- }
-
- private static String createConsumerProducerPropFile(String topic, String defaultProfFileName, Properties props) throws IOException {
- ClassLoader classLoader = DmaapUtil.class.getClassLoader();
- InputStream inputStream = classLoader.getResourceAsStream(defaultProfFileName);
- Properties defaultProps = new Properties();
- defaultProps.load(inputStream);
- defaultProps.setProperty("topic",topic);
-
- String preferredRouteFileName = DmaapUtil.createPreferredRouteFileIfNotExist(topic);
- if(props != null && !props.isEmpty()){
- defaultProps.putAll(props);
- }
- defaultProps.setProperty("topic",topic);
- defaultProps.setProperty("DME2preferredRouterFilePath",preferredRouteFileName);
- String id = defaultProps.getProperty("id");
- String topicConsumerPropFileName = defaultProfFileName;
- topicConsumerPropFileName = id != null ? id+delimiter+topicConsumerPropFileName : delimiter+topicConsumerPropFileName;
- topicConsumerPropFileName = topic != null ? topic+delimiter+topicConsumerPropFileName : delimiter+topicConsumerPropFileName;
-
- defaultProps.store(new FileOutputStream(topicConsumerPropFileName), defaultProfFileName+" file created on the fly for topic:"+topic+" on:"+System.currentTimeMillis());
- return topicConsumerPropFileName;
- }
-
-}
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java
deleted file mode 100644
index bf7649026..000000000
--- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.adapter.messaging.dmaap.impl;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.onap.appc.adapter.message.EventSender;
-import org.onap.appc.adapter.message.MessageDestination;
-import org.onap.appc.adapter.message.Producer;
-import org.onap.appc.adapter.message.event.EventHeader;
-import org.onap.appc.adapter.message.event.EventMessage;
-import org.onap.appc.adapter.message.event.EventStatus;
-import org.onap.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl;
-import org.onap.appc.configuration.Configuration;
-import org.onap.appc.configuration.ConfigurationFactory;
-import org.onap.appc.exceptions.APPCException;
-
-public class EventSenderDmaapImpl implements EventSender
-{
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderDmaapImpl.class);
- public static final String EVENT_TOPIC_WRITE = "dmaap.event.topic.write";
- public static final String DMAAP_USERNAME = "dmaap.appc.username";
- public static final String DMAAP_PASSWORD = "dmaap.appc.password";
- public static final String EVENT_POOL_MEMBERS = "dmaap.event.pool.members";
-
- private static Configuration configuration = ConfigurationFactory.getConfiguration();
-
- private Map<String,Producer> producerMap = new ConcurrentHashMap<>();
-
- public Map<String, Producer> getProducerMap() {
- return producerMap;
- }
-
- public void setProducerMap(Map<String, Producer> producerMap) {
- this.producerMap = producerMap;
- }
-
- public EventSenderDmaapImpl(){
-
- }
-
- public void initialize(){
- Properties properties = configuration.getProperties();
- String writeTopic;
- String username;
- String password;
- final List<String> pool = new ArrayList<>();
-
- for(MessageDestination destination: MessageDestination.values()){
- writeTopic = properties.getProperty(destination + "." + EVENT_TOPIC_WRITE);
- username = properties.getProperty(destination + "." + DMAAP_USERNAME);
- password = properties.getProperty(destination + "." + DMAAP_PASSWORD);
- String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS);
-
- if (hostNames != null && !hostNames.isEmpty()) {
- LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS));
- Collections.addAll(pool, hostNames.split(","));
- }
-
- LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS));
- LOG.debug(String.format("writeTopic = %s, taken from property: %s", writeTopic, destination + "." + EVENT_TOPIC_WRITE));
- LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME));
- Producer producer = new DmaapProducerImpl(pool, writeTopic,username, password);
-
- for (String url : pool) {
- if (url.contains("3905") || url.contains("https")) {
- LOG.debug("Producer should use HTTPS");
- producer.useHttps(true);
- break;
- }
- }
- producerMap.put(destination.toString(),producer);
- }
-
- }
-
- @Override
- public boolean sendEvent(MessageDestination destination, EventMessage msg) {
- String jsonStr = msg.toJson();
- String id = msg.getEventHeader().getEventId();
- LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr));
- Producer producer = producerMap.get(destination.toString());
- return producer.post(id, jsonStr);
- }
-
- @Override
- public boolean sendEvent(MessageDestination destination, EventMessage msg, String eventTopicName) {
- String jsonStr = msg.toJson();
- String id = msg.getEventHeader().getEventId();
- LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr));
- Producer producer = createProducer(destination, eventTopicName);
- return producer.post(id, jsonStr);
- }
-
- private Producer createProducer(MessageDestination destination, String eventTopicName) {
- Properties properties = configuration.getProperties();
- final List<String> pool = new ArrayList<>();
- String username = properties.getProperty(destination + "." + DMAAP_USERNAME);
- String password = properties.getProperty(destination + "." + DMAAP_PASSWORD);
- String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS);
-
- if (hostNames != null && !hostNames.isEmpty()) {
- LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS));
- Collections.addAll(pool, hostNames.split(","));
- }
-
- LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS));
- LOG.debug(String.format("writeTopic = %s, taken from property: %s", eventTopicName, destination + "." + EVENT_TOPIC_WRITE));
- LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME));
- Producer producer = new DmaapProducerImpl(pool, eventTopicName,username, password);
-
- for (String url : pool) {
- if (url.contains("3905") || url.contains("https")) {
- LOG.debug("Producer should use HTTPS");
- producer.useHttps(true);
- break;
- }
- }
- return producer;
- }
-
- @Override
- public boolean sendEvent(MessageDestination destination, Map<String, String> params, SvcLogicContext ctx) throws APPCException {
-
- if (params == null) {
- String message = "Parameters map is empty (null)";
- LOG.error(message);
- throw new APPCException(message);
- }
- String eventTime = new Date(System.currentTimeMillis()).toString();
- String apiVer = params.get("apiVer");
- String eventId = params.get("eventId");
- String reason = params.get("reason");
- String entityId=params.get("entityId");
- if(entityId!=null){
- reason=reason+"("+entityId+")";
- }
- Integer code = Integer.getInteger(params.get("code"), 500);
-
- if (eventTime == null || apiVer == null || eventId == null || reason == null) {
- String message = String.format("Missing input parameters: %s", params);
- LOG.error(message);
- throw new APPCException(message);
- }
- EventMessage eventMessage = new EventMessage(
- new EventHeader(eventTime, apiVer, eventId),
- new EventStatus(code, reason));
-
- return sendEvent(destination,eventMessage);
- }
-}