From 161df8a94bb3b0c34ed16fd4fdba078bd1eeef9a Mon Sep 17 00:00:00 2001 From: Patrick Brady Date: Wed, 13 Dec 2017 11:14:21 -0800 Subject: Second part of onap rename This is the second commit of the rename. The folder structure is renamed for appc-adapters and appc-config in this commit. Change-Id: Iaa2b8c937ff1ca1b5d1178128961fb115ee65d9b Signed-off-by: Patrick Brady Issue-ID: APPC-13 --- .../messaging/dmaap/AppcDmaapAdapterActivator.java | 106 ++++++++++ .../messaging/dmaap/http/CommonHttpClient.java | 108 ++++++++++ .../dmaap/http/HttpDmaapConsumerImpl.java | 167 +++++++++++++++ .../dmaap/http/HttpDmaapProducerImpl.java | 138 ++++++++++++ .../messaging/dmaap/impl/DmaapConsumerImpl.java | 234 +++++++++++++++++++++ .../messaging/dmaap/impl/DmaapProducerImpl.java | 223 ++++++++++++++++++++ .../adapter/messaging/dmaap/impl/DmaapUtil.java | 86 ++++++++ .../messaging/dmaap/impl/EventSenderDmaapImpl.java | 178 ++++++++++++++++ .../messaging/dmaap/AppcDmaapAdapterActivator.java | 106 ---------- .../messaging/dmaap/http/CommonHttpClient.java | 108 ---------- .../dmaap/http/HttpDmaapConsumerImpl.java | 167 --------------- .../dmaap/http/HttpDmaapProducerImpl.java | 138 ------------ .../messaging/dmaap/impl/DmaapConsumerImpl.java | 234 --------------------- .../messaging/dmaap/impl/DmaapProducerImpl.java | 223 -------------------- .../adapter/messaging/dmaap/impl/DmaapUtil.java | 86 -------- .../messaging/dmaap/impl/EventSenderDmaapImpl.java | 178 ---------------- 16 files changed, 1240 insertions(+), 1240 deletions(-) create mode 100644 appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java create mode 100644 appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java create mode 100644 appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java create mode 100644 appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java create mode 100644 appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java create mode 100644 appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java create mode 100644 appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapUtil.java create mode 100644 appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java delete mode 100644 appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java delete mode 100644 appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java delete mode 100644 appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java delete mode 100644 appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java delete mode 100644 appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java delete mode 100644 appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java delete mode 100644 appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java delete mode 100644 appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java') diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java new file mode 100644 index 000000000..812f80121 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java @@ -0,0 +1,106 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.adapter.messaging.dmaap; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceRegistration; + + +/** + * This activator is used to initialize and terminate the connection pool to one or more providers. + *

+ * The CDP abstraction layer supports multiple types of providers, with each provider supporting multiple tenants. The + * "connection" to a specific tenant on a specific provider is represented by a "context" object. These context objects + * are authenticated to a specific tenant on the provider, but can be reused from one request to another. Contexts are + * slow to set up and are resource intensive, so they are cached. However, the contexts for a specific tenant on a + * specific provider must be cached separately. + *

+ *

+ * Activation of the bundle creates an empty cache which is organized first by provider type, then by tenant name, with + * the contents being an empty pool of contexts for that provider/tenant combination. The pool is created on first use, + * and retained for as long as the bundle is active. + *

+ *

+ * When the bundle is deactivated, the cache is torn down with all contexts being closed. + *

+ */ +public class AppcDmaapAdapterActivator implements BundleActivator { + private ServiceRegistration registration = null; + + /** + * The logger to be used + */ + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(AppcDmaapAdapterActivator.class); + + /** + * Called when this bundle is started so the Framework can perform the bundle-specific activities necessary to start + * this bundle. This method can be used to register services or to allocate any resources that this bundle needs. + *

+ * This method must complete and return to its caller in a timely manner. + *

+ * + * @param bundleContext + * The execution context of the bundle being started. + * @throws java.lang.Exception + * If this method throws an exception, this bundle is marked as stopped and the Framework will remove + * this bundle's listeners, unregister all services registered by this bundle, and release all services + * used by this bundle. + * @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext) + */ + @Override + public void start(final BundleContext bundleContext) throws Exception { + LOG.info("Starting Bundle " + getName()); + } + + /** + * Called when this bundle is stopped so the Framework can perform the bundle-specific activities necessary to stop + * the bundle. In general, this method should undo the work that the BundleActivator.start method started. There + * should be no active threads that were started by this bundle when this bundle returns. A stopped bundle must not + * call any Framework objects. + *

+ * This method must complete and return to its caller in a timely manner. + *

+ * + * @param ctx + * The execution context of the bundle being stopped. + * @throws java.lang.Exception + * If this method throws an exception, the bundle is still marked as stopped, and the Framework will + * remove the bundle's listeners, unregister all services registered by the bundle, and release all + * services used by the bundle. * + * @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext) + */ + @Override + public void stop(BundleContext ctx) throws Exception { + LOG.info("Stopped Bundle " + getName()); + } + + public String getName() { + return "DMaaP Adapter"; + } + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java new file mode 100644 index 000000000..76b050d8e --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/CommonHttpClient.java @@ -0,0 +1,108 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.adapter.messaging.dmaap.http; + +import java.net.URI; + +import org.apache.commons.codec.binary.Base64; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.config.RequestConfig.Builder; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; + +public class CommonHttpClient { + + public static final int HTTPS_PORT = 3905; + + private String AUTH_STR; + + protected void setBasicAuth(String username, String password) { + if (username != null && password != null) { + String plain = String.format("%s:%s", username, password); + AUTH_STR = Base64.encodeBase64String(plain.getBytes()); + } else { + AUTH_STR = null; + } + } + + public HttpGet getReq(URI uri, int timeoutMs) throws Exception { + HttpGet out = (uri == null) ? new HttpGet() : new HttpGet(uri); + if (AUTH_STR != null) { + out.setHeader("Authorization", String.format("Basic %s", AUTH_STR)); + } + out.setConfig(getConfig(timeoutMs)); + return out; + } + + public HttpPost postReq(String url) throws Exception { + HttpPost out = (url == null) ? new HttpPost() : new HttpPost(url); + if (AUTH_STR != null) { + out.setHeader("Authorization", String.format("Basic %s", AUTH_STR)); + } + out.setConfig(getConfig(0)); + return out; + } + + private RequestConfig getConfig(int timeoutMs) { + Builder builder = RequestConfig.custom(); + builder.setSocketTimeout(timeoutMs + 5000); + return builder.build(); + } + + public CloseableHttpClient getClient() { + return getClient(false); + } + + public CloseableHttpClient getClient(boolean useHttps) { + return HttpClientBuilder.create().build(); + } + + public String formatHostString(String host) { + return formatHostString(host, host.contains(String.valueOf(HTTPS_PORT))); + } + + public String formatHostString(String host, boolean useHttps) { + // Trim trailing slash + String out = host.endsWith("/") ? host.substring(0, host.length() - 1) : host; + + boolean hasProto = out.startsWith("http"); + boolean hasPort = out.contains(":"); + + // Add protocol + if (!hasProto) { + out = String.format("%s%s", (useHttps) ? "https://" : "http://", out); + } + + // Add port + if (!hasPort) { + out = String.format("%s:%d", out, (useHttps) ? 3905 : 3904); + } + + return out; + + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java new file mode 100644 index 000000000..df81e9718 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java @@ -0,0 +1,167 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.adapter.messaging.dmaap.http; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import org.apache.http.HttpEntity; +import org.apache.http.NameValuePair; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.util.EntityUtils; +import org.json.JSONArray; +import org.onap.appc.adapter.message.Consumer; + +public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer { + + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class); + + // Default values + private static final int DEFAULT_TIMEOUT_MS = 15000; + private static final int DEFAULT_LIMIT = 1000; + private static final String HTTPS_PORT = ":3905"; + private static final String URL_TEMPLATE = "%s/events/%s/%s/%s"; + + private List urls; + private String filter; + + private boolean useHttps = false; + + public HttpDmaapConsumerImpl(Collection hosts, String topicName, String consumerName, String consumerId) { + this(hosts, topicName, consumerName, consumerId, null); + } + + public HttpDmaapConsumerImpl(Collection hosts, String topicName, String consumerName, String consumerId, + String filter) { + this(hosts, topicName, consumerName, consumerId, filter, null, null); + } + + public HttpDmaapConsumerImpl(Collection hosts, String topicName, String consumerName, String consumerId, + String filter, String user, String password) { + urls = new ArrayList(); + for (String host : hosts) { + urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId)); + } + this.filter = filter; + updateCredentials(user, password); + } + + @Override + public void updateCredentials(String user, String pass) { + LOG.debug(String.format("Setting auth to %s for %s", user, this.toString())); + this.setBasicAuth(user, pass); + } + + @Override + public List fetch(int waitMs, int limit) { + LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); + List out = new ArrayList(); + try { + List urlParams = new ArrayList(); + urlParams.add(new BasicNameValuePair("timeout", String.valueOf(waitMs))); + urlParams.add(new BasicNameValuePair("limit", String.valueOf(limit))); + if (filter != null) { + urlParams.add(new BasicNameValuePair("filter", filter)); + } + + URIBuilder builder = new URIBuilder(urls.get(0)); + builder.setParameters(urlParams); + + URI uri = builder.build(); + LOG.info(String.format("GET %s", uri)); + HttpGet request = getReq(uri, waitMs); + CloseableHttpResponse response = getClient().execute(request); + + int httpStatus = response.getStatusLine().getStatusCode(); + HttpEntity entity = response.getEntity(); + String body = (entity != null) ? EntityUtils.toString(entity) : null; + + LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus, + (body != null ? body.length() : "null"))); + + response.close(); + if (httpStatus == 200 && body != null) { + JSONArray json = new JSONArray(body); + LOG.info(String.format("Got %d messages from DMaaP", json.length())); + for (int i = 0; i < json.length(); i++) { + out.add(json.getString(i)); + } + } else { + LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body)); + sleep(waitMs); + } + } catch (Exception e) { + if (urls.size() > 1) { + String failedUrl = urls.remove(0); + urls.add(failedUrl); + LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl, + urls.get(0))); + } + LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e); + sleep(waitMs); + } + + return out; + } + + @Override + public List fetch() { + return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); + } + + @Override + public String toString() { + String hostStr = (urls == null || urls.isEmpty()) ? "N/A" : urls.get(0); + return String.format("Consumer listening to [%s]", hostStr); + } + + @Override + public void useHttps(boolean yes) { + useHttps = yes; + } + + private void sleep(int ms) { + LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000)); + try { + Thread.sleep(ms); + } catch (InterruptedException e1) { + LOG.error("Interrupted while sleeping"); + } + } + + @Override + public void close() { + // Nothing to do + } + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java new file mode 100644 index 000000000..560c09be4 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java @@ -0,0 +1,138 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.adapter.messaging.dmaap.http; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.onap.appc.adapter.message.Producer; + +public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer { + + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class); + + private static final String CONTENT_TYPE = "application/cambria"; + private static final String URL_TEMPLATE = "%s/events/%s"; + + private List hosts; + private Set topics; + + private boolean useHttps = false; + + public HttpDmaapProducerImpl(Collection urls, String topicName) { + hosts = new ArrayList(); + topics = new HashSet(); + topics.add(topicName); + + for (String host : urls) { + hosts.add(formatHostString(host)); + } + } + + public HttpDmaapProducerImpl(Collection urls, Set topicNames) { + hosts = new ArrayList(); + topics = topicNames; + + for (String host : urls) { + hosts.add(formatHostString(host)); + } + } + + @Override + public void updateCredentials(String user, String pass) { + LOG.debug(String.format("Setting auth to %s for %s", user, this.toString())); + this.setBasicAuth(user, pass); + } + + @Override + public boolean post(String partition, String data) { + int sent = 0; + try { + HttpPost request = postReq(null); + request.setHeader("Content-Type", CONTENT_TYPE); + request.setEntity(new StringEntity(bodyLine(partition, data))); + + for (String topic : topics) { + String uriStr = String.format(URL_TEMPLATE, hosts.get(0), topic); + try { + request.setURI(new URI(uriStr)); + CloseableHttpResponse response = getClient().execute(request); + if (response.getStatusLine().getStatusCode() == 200) { + sent++; + } + response.close(); + } catch (Exception sendEx) { + LOG.error(String.format("Failed to send message to %s. Reason: %s", uriStr, sendEx.getMessage()), + sendEx); + if (hosts.size() > 1) { + String failedUrl = hosts.remove(0); + hosts.add(failedUrl); + LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", + failedUrl, hosts.get(0))); + } + } + } + } catch (Exception buildEx) { + LOG.error( + String.format("Failed to build request with string [%s]. Message not sent to any topic. Reason: %s", + data, buildEx.getMessage()), + buildEx); + } + return sent == topics.size(); + } + + @Override + public void useHttps(boolean yes) { + useHttps = yes; + } + + /** + * Format the body for the application/cambria content type with no partitioning. See + * + * @param msg + * The message body to format + * @return A string in the application/cambria content type + */ + private String bodyLine(String partition, String msg) { + String p = (partition == null) ? "" : partition; + String m = (msg == null) ? "" : msg; + return String.format("%d.%d.%s%s", p.length(), m.length(), p, m); + } + + @Override + public void close() { + // Nothing to do + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java new file mode 100644 index 000000000..ddf630545 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java @@ -0,0 +1,234 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.adapter.messaging.dmaap.impl; + +import java.io.IOException; +import java.util.*; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +//import com.att.nsa.cambria.client.CambriaClientBuilders; +//import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; +//import com.att.nsa.cambria.client.CambriaConsumer; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; +import org.apache.commons.lang3.StringUtils; +import org.onap.appc.adapter.message.Consumer; +import org.onap.appc.configuration.Configuration; +import org.onap.appc.configuration.ConfigurationFactory; +import org.onap.appc.metricservice.MetricRegistry; +import org.onap.appc.metricservice.MetricService; +import org.onap.appc.metricservice.impl.MetricServiceImpl; +import org.onap.appc.metricservice.metric.Metric; +import org.onap.appc.metricservice.metric.MetricType; +import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric; +import org.onap.appc.metricservice.policy.PublishingPolicy; +import org.onap.appc.metricservice.publisher.LogPublisher; +import org.osgi.framework.BundleContext; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.ServiceReference; + +public class DmaapConsumerImpl implements Consumer { + + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class); + private static final Configuration configuration = ConfigurationFactory.getConfiguration(); + // Default values + private static final int DEFAULT_TIMEOUT_MS = 60000; + private static final int DEFAULT_LIMIT = 1000; + private static MetricRegistry metricRegistry; + private String topic; + private DmaapRequestCounterMetric dmaapKpiMetric; + private boolean isMetricEnabled=false; + private boolean useHttps = false; + private MRConsumer client = null; + private Properties props = null; + + + public DmaapConsumerImpl(Collection urls, String topicName, String consumerGroupName, String consumerId,String user, String password) { + this(urls, topicName, consumerGroupName, consumerId,user, password,null); + + } + + public DmaapConsumerImpl(Collection urls, String topicName, String consumerGroupName, String consumerId,String user, String password,String filter) { + this.topic = topicName; + this.props = new Properties(); + String urlsStr = StringUtils.join(urls, ','); + props.setProperty("host",urlsStr); + props.setProperty("group",consumerGroupName); + props.setProperty("id",consumerId); + props.setProperty("username",user); + props.setProperty("password",password); + if(filter != null) { + props.setProperty("filter", filter); + } + } + + + private void initMetric() { + LOG.debug("Metric getting initialized"); + MetricService metricService = getMetricservice(); + metricRegistry = metricService.createRegistry("APPC"); + dmaapKpiMetric = metricRegistry.metricBuilderFactory(). + dmaapRequestCounterBuilder(). + withName("DMAAP_KPI").withType(MetricType.COUNTER). + withRecievedMessage(0) + .withPublishedMessage(0) + .build(); + if (metricRegistry.register(dmaapKpiMetric)) { + Metric[] metrics = new Metric[]{dmaapKpiMetric}; + LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); + LogPublisher[] logPublishers = new LogPublisher[1]; + logPublishers[0] = logPublisher; + PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory(). + scheduledPolicyBuilder().withPublishers(logPublishers). + withMetrics(metrics). + build(); + LOG.debug("Policy getting initialized"); + manuallyScheduledPublishingPolicy.init(); + LOG.debug("Metric initialized"); + } + } + private MRConsumer getClient() { + return getClient(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); + } + + /** + * @return An instance of MRConsumer created from our class variables + */ + private synchronized MRConsumer getClient(int waitMs, int limit) { + try { + props.setProperty("timeout",String.valueOf(waitMs)); + props.setProperty("limit",String.valueOf(limit)); + String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic,props); + return MRClientFactory.createConsumer ( topicProducerPropFileName); + } catch (IOException e1) { + LOG.error("failed to createConsumer",e1); + return null; + } + } + + @Override + public synchronized void updateCredentials(String key, String secret) { + LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); + String user = key; + String password = secret; + props.setProperty("user",String.valueOf(user)); + props.setProperty("password",String.valueOf(password)); + client = null; + } + + @Override + public List fetch(int waitMs, int limit) { + Properties properties=configuration.getProperties(); + if(properties!=null && properties.getProperty("metric.enabled")!=null ){ + isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled")); + } + if(isMetricEnabled){ + initMetric(); + } + LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); + List out = new ArrayList(); + + // Create client once and reuse it on subsequent fetches. This is + // to support failover to other servers in the DMaaP cluster. + if (client == null) { + LOG.info("Getting DMaaP Client ..."); + client = getClient(waitMs, limit); + } + try { + for (String s : client.fetch(waitMs, limit)) { + out.add(s); + if(isMetricEnabled){ + ((DmaapRequestCounterMetric)metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage(); + } + } + LOG.debug(String.format("Got %d records from %s", out.size(), this.toString())); + } catch (Exception e) { + // Connection exception + LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage())); + e.printStackTrace(); + try { + LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs)); + Thread.sleep(waitMs); + } catch (InterruptedException e2) { + LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs)); + } + } + + + return out; + } + + /** + * Close consumer Dmaap client + */ + @Override + public void close() { + LOG.debug("Closing Dmaap consumer client...."); + if (client != null) { + client.close(); + } + } + + @Override + public List fetch() { + return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); + } + + @Override + public String toString() { + String hostStr = (props == null || props.getProperty("host") == null? "N/A" : props.getProperty("host")); + String group = (props == null || props.getProperty("group") == null? "N/A" : props.getProperty("group")); + String id = (props == null || props.getProperty("id") == null? "N/A" : props.getProperty("id")); + return String.format("Consumer %s/%s listening to %s on [%s]", group, id, topic, hostStr); + } + + @Override + public void useHttps(boolean yes) { + useHttps = yes; + } + + + private MetricService getMetricservice() { + BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext(); + // Get AAIadapter reference + ServiceReference sref = bctx.getServiceReference(MetricService.class.getName()); + if (sref != null) { + LOG.info("Metric Service from bundlecontext"); + return (MetricServiceImpl) bctx.getService(sref); + + } else { + LOG.info("Metric Service error from bundlecontext"); + LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService"); + return null; + + } + } + + public Metric getMetric(String name){ + return metricRegistry.metric(name); + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java new file mode 100644 index 000000000..7ed06a9e3 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java @@ -0,0 +1,223 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.adapter.messaging.dmaap.impl; + +import java.io.*; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +//import com.att.nsa.cambria.client.CambriaBatchingPublisher; +//import com.att.nsa.cambria.client.CambriaClientBuilders; +//import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import org.apache.commons.lang3.StringUtils; +import org.onap.appc.adapter.message.Producer; +import org.onap.appc.adapter.messaging.dmaap.impl.DmaapUtil; +import org.onap.appc.configuration.Configuration; +import org.onap.appc.configuration.ConfigurationFactory; +import org.onap.appc.metricservice.MetricRegistry; +import org.onap.appc.metricservice.MetricService; +import org.onap.appc.metricservice.metric.Metric; +import org.onap.appc.metricservice.metric.MetricType; +import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric; +import org.onap.appc.metricservice.policy.PublishingPolicy; +import org.onap.appc.metricservice.publisher.LogPublisher; +import org.osgi.framework.BundleContext; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.ServiceReference; + +public class DmaapProducerImpl implements Producer { + + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class); + private static final Configuration configuration = ConfigurationFactory.getConfiguration(); + + private Set topics = new HashSet(); + + private Properties props = null; + private static MetricRegistry metricRegistry; + private boolean useHttps = false; + private DmaapRequestCounterMetric dmaapKpiMetric; + private boolean isMetricEnabled=false; + + private Set clients; + + + public DmaapProducerImpl(Collection urls, String topicName, String user, String password) { + this(urls, (Set)null, user, password); + this.topics = new HashSet<>(); + if (topicName != null) { + for (String topic : topicName.split(",")) { + topics.add(topic); + } + } + } + + public DmaapProducerImpl(Collection urls, Set topicNames, String user, String password) { + topics = topicNames; + if(urls == null || user == null || password == null){ + throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password" ); + } + this.props = new Properties(); + String urlsStr = StringUtils.join(urls, ','); + props.setProperty("host",urlsStr); + props.setProperty("id", UUID.randomUUID().toString()); + props.setProperty("username",user); + props.setProperty("password",password); + } + private void initMetric() { + LOG.debug("Metric getting initialized"); + MetricService metricService = getMetricservice(); + metricRegistry=metricService.createRegistry("APPC"); + dmaapKpiMetric = metricRegistry.metricBuilderFactory(). + dmaapRequestCounterBuilder(). + withName("DMAAP_KPI").withType(MetricType.COUNTER). + withRecievedMessage(0) + .withPublishedMessage(0) + .build(); + if(metricRegistry.register(dmaapKpiMetric)) { + Metric[] metrics = new Metric[]{dmaapKpiMetric}; + LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); + LogPublisher[] logPublishers = new LogPublisher[1]; + logPublishers[0] = logPublisher; + PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory(). + scheduledPolicyBuilder().withPublishers(logPublishers). + withMetrics(metrics). + build(); + LOG.debug("Policy getting initialized"); + manuallyScheduledPublishingPolicy.init(); + LOG.debug("Metric initialized"); + } + + } + private Set getClients() { + Set out = new HashSet(); + for (String topic : topics) { + try { + String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props); + final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher (topicProducerPropFileName); + out.add(client); + } catch (Exception e) { + e.printStackTrace(); + } + } + + return out; + } + + @Override + public synchronized void updateCredentials(String key, String secret) { + LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); + String user = key; + String password = secret; + props.setProperty("user",String.valueOf(user)); + props.setProperty("password",String.valueOf(password)); + clients = null; + } + + @Override + public boolean post(String partition, String data) { + boolean success = true; + Properties properties=configuration.getProperties(); + if(properties!=null && properties.getProperty("metric.enabled")!=null ){ + isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled")); + } + if(isMetricEnabled){ + initMetric(); + } + + // Create clients once and reuse them on subsequent posts. This is + // to support failover to other servers in the Dmaap cluster. + if ((clients == null) || (clients.isEmpty())) { + LOG.info("Getting CambriaBatchingPublisher Clients ..."); + clients = getClients(); + } + + for (MRBatchingPublisher client : clients) { + try { + LOG.debug(String.format("Posting %s to %s", data, client)); + client.send(partition, data); + } catch (IOException e) { + e.printStackTrace(); + success = false; + } + } + if(isMetricEnabled){ + ( (DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage(); + } + return success; + } + + /** + * Close producer Dmaap client + */ + @Override + public void close() { + if ((clients == null) || (clients.isEmpty())) { + return; + } + + LOG.debug("Closing Dmaap producer clients...."); + for (MRBatchingPublisher client : clients) { + try { + client.close(1, TimeUnit.SECONDS); + } catch (IOException | InterruptedException e) { + LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client)); + e.printStackTrace(); + } + } + } + + @Override + public void useHttps(boolean yes) { + useHttps = yes; + } + + private MetricService getMetricservice() { +/* + return AppcDmaapAdapterActivator.getMetricService(); +*/ + + BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext(); + ServiceReference sref = bctx.getServiceReference(MetricService.class.getName()); + if (sref != null) { + LOG.info("Metric Service from bundlecontext"); + return (MetricService) bctx.getService(sref); + + } else { + LOG.info("Metric Service error from bundlecontext"); + LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService"); + return null; + + } + } + + public Metric getMetric(String name){ + return metricRegistry.metric(name); + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapUtil.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapUtil.java new file mode 100644 index 000000000..da1b0fa38 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapUtil.java @@ -0,0 +1,86 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.adapter.messaging.dmaap.impl; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +public class DmaapUtil { + private final static String delimiter = "_"; + private static String createPreferredRouteFileIfNotExist(String topic) throws IOException { + String topicPreferredRouteFileName = null; + topicPreferredRouteFileName = topic+"preferredRoute.properties"; + File fo= new File(topicPreferredRouteFileName); + if(!fo.exists()) { + ClassLoader classLoader = DmaapUtil.class.getClassLoader(); + InputStream inputStream = classLoader.getResourceAsStream("preferredRoute.txt"); + Properties props = new Properties(); + props.load(inputStream); + String fileName = topic != null ? topic+delimiter+"MR1" : delimiter+"MR1"; + props.setProperty("preferredRouteKey", fileName); + topicPreferredRouteFileName = topic + "preferredRoute.properties"; + props.store(new FileOutputStream(topicPreferredRouteFileName), "preferredRoute.properties file created on the fly for topic:" + topic + " on:" + System.currentTimeMillis()); + } + return topicPreferredRouteFileName; + } + + public static String createConsumerPropFile(String topic, Properties props)throws IOException { + String defaultProfFileName = "consumer.properties"; + String topicConsumerPropFileName = createConsumerProducerPropFile(topic, defaultProfFileName,props); + return topicConsumerPropFileName; + } + + public static String createProducerPropFile(String topic, Properties props)throws IOException { + String defaultProfFileName = "producer.properties"; + String topicConsumerPropFileName = createConsumerProducerPropFile(topic, defaultProfFileName,props); + return topicConsumerPropFileName; + } + + private static String createConsumerProducerPropFile(String topic, String defaultProfFileName, Properties props) throws IOException { + ClassLoader classLoader = DmaapUtil.class.getClassLoader(); + InputStream inputStream = classLoader.getResourceAsStream(defaultProfFileName); + Properties defaultProps = new Properties(); + defaultProps.load(inputStream); + defaultProps.setProperty("topic",topic); + + String preferredRouteFileName = DmaapUtil.createPreferredRouteFileIfNotExist(topic); + if(props != null && !props.isEmpty()){ + defaultProps.putAll(props); + } + defaultProps.setProperty("topic",topic); + defaultProps.setProperty("DME2preferredRouterFilePath",preferredRouteFileName); + String id = defaultProps.getProperty("id"); + String topicConsumerPropFileName = defaultProfFileName; + topicConsumerPropFileName = id != null ? id+delimiter+topicConsumerPropFileName : delimiter+topicConsumerPropFileName; + topicConsumerPropFileName = topic != null ? topic+delimiter+topicConsumerPropFileName : delimiter+topicConsumerPropFileName; + + defaultProps.store(new FileOutputStream(topicConsumerPropFileName), defaultProfFileName+" file created on the fly for topic:"+topic+" on:"+System.currentTimeMillis()); + return topicConsumerPropFileName; + } + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java new file mode 100644 index 000000000..bf7649026 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java @@ -0,0 +1,178 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.adapter.messaging.dmaap.impl; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.onap.ccsdk.sli.core.sli.SvcLogicContext; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import org.onap.appc.adapter.message.EventSender; +import org.onap.appc.adapter.message.MessageDestination; +import org.onap.appc.adapter.message.Producer; +import org.onap.appc.adapter.message.event.EventHeader; +import org.onap.appc.adapter.message.event.EventMessage; +import org.onap.appc.adapter.message.event.EventStatus; +import org.onap.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl; +import org.onap.appc.configuration.Configuration; +import org.onap.appc.configuration.ConfigurationFactory; +import org.onap.appc.exceptions.APPCException; + +public class EventSenderDmaapImpl implements EventSender +{ + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderDmaapImpl.class); + public static final String EVENT_TOPIC_WRITE = "dmaap.event.topic.write"; + public static final String DMAAP_USERNAME = "dmaap.appc.username"; + public static final String DMAAP_PASSWORD = "dmaap.appc.password"; + public static final String EVENT_POOL_MEMBERS = "dmaap.event.pool.members"; + + private static Configuration configuration = ConfigurationFactory.getConfiguration(); + + private Map producerMap = new ConcurrentHashMap<>(); + + public Map getProducerMap() { + return producerMap; + } + + public void setProducerMap(Map producerMap) { + this.producerMap = producerMap; + } + + public EventSenderDmaapImpl(){ + + } + + public void initialize(){ + Properties properties = configuration.getProperties(); + String writeTopic; + String username; + String password; + final List pool = new ArrayList<>(); + + for(MessageDestination destination: MessageDestination.values()){ + writeTopic = properties.getProperty(destination + "." + EVENT_TOPIC_WRITE); + username = properties.getProperty(destination + "." + DMAAP_USERNAME); + password = properties.getProperty(destination + "." + DMAAP_PASSWORD); + String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS); + + if (hostNames != null && !hostNames.isEmpty()) { + LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS)); + Collections.addAll(pool, hostNames.split(",")); + } + + LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS)); + LOG.debug(String.format("writeTopic = %s, taken from property: %s", writeTopic, destination + "." + EVENT_TOPIC_WRITE)); + LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME)); + Producer producer = new DmaapProducerImpl(pool, writeTopic,username, password); + + for (String url : pool) { + if (url.contains("3905") || url.contains("https")) { + LOG.debug("Producer should use HTTPS"); + producer.useHttps(true); + break; + } + } + producerMap.put(destination.toString(),producer); + } + + } + + @Override + public boolean sendEvent(MessageDestination destination, EventMessage msg) { + String jsonStr = msg.toJson(); + String id = msg.getEventHeader().getEventId(); + LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr)); + Producer producer = producerMap.get(destination.toString()); + return producer.post(id, jsonStr); + } + + @Override + public boolean sendEvent(MessageDestination destination, EventMessage msg, String eventTopicName) { + String jsonStr = msg.toJson(); + String id = msg.getEventHeader().getEventId(); + LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr)); + Producer producer = createProducer(destination, eventTopicName); + return producer.post(id, jsonStr); + } + + private Producer createProducer(MessageDestination destination, String eventTopicName) { + Properties properties = configuration.getProperties(); + final List pool = new ArrayList<>(); + String username = properties.getProperty(destination + "." + DMAAP_USERNAME); + String password = properties.getProperty(destination + "." + DMAAP_PASSWORD); + String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS); + + if (hostNames != null && !hostNames.isEmpty()) { + LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS)); + Collections.addAll(pool, hostNames.split(",")); + } + + LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS)); + LOG.debug(String.format("writeTopic = %s, taken from property: %s", eventTopicName, destination + "." + EVENT_TOPIC_WRITE)); + LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME)); + Producer producer = new DmaapProducerImpl(pool, eventTopicName,username, password); + + for (String url : pool) { + if (url.contains("3905") || url.contains("https")) { + LOG.debug("Producer should use HTTPS"); + producer.useHttps(true); + break; + } + } + return producer; + } + + @Override + public boolean sendEvent(MessageDestination destination, Map params, SvcLogicContext ctx) throws APPCException { + + if (params == null) { + String message = "Parameters map is empty (null)"; + LOG.error(message); + throw new APPCException(message); + } + String eventTime = new Date(System.currentTimeMillis()).toString(); + String apiVer = params.get("apiVer"); + String eventId = params.get("eventId"); + String reason = params.get("reason"); + String entityId=params.get("entityId"); + if(entityId!=null){ + reason=reason+"("+entityId+")"; + } + Integer code = Integer.getInteger(params.get("code"), 500); + + if (eventTime == null || apiVer == null || eventId == null || reason == null) { + String message = String.format("Missing input parameters: %s", params); + LOG.error(message); + throw new APPCException(message); + } + EventMessage eventMessage = new EventMessage( + new EventHeader(eventTime, apiVer, eventId), + new EventStatus(code, reason)); + + return sendEvent(destination,eventMessage); + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java deleted file mode 100644 index 812f80121..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java +++ /dev/null @@ -1,106 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import org.osgi.framework.BundleActivator; -import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceRegistration; - - -/** - * This activator is used to initialize and terminate the connection pool to one or more providers. - *

- * The CDP abstraction layer supports multiple types of providers, with each provider supporting multiple tenants. The - * "connection" to a specific tenant on a specific provider is represented by a "context" object. These context objects - * are authenticated to a specific tenant on the provider, but can be reused from one request to another. Contexts are - * slow to set up and are resource intensive, so they are cached. However, the contexts for a specific tenant on a - * specific provider must be cached separately. - *

- *

- * Activation of the bundle creates an empty cache which is organized first by provider type, then by tenant name, with - * the contents being an empty pool of contexts for that provider/tenant combination. The pool is created on first use, - * and retained for as long as the bundle is active. - *

- *

- * When the bundle is deactivated, the cache is torn down with all contexts being closed. - *

- */ -public class AppcDmaapAdapterActivator implements BundleActivator { - private ServiceRegistration registration = null; - - /** - * The logger to be used - */ - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(AppcDmaapAdapterActivator.class); - - /** - * Called when this bundle is started so the Framework can perform the bundle-specific activities necessary to start - * this bundle. This method can be used to register services or to allocate any resources that this bundle needs. - *

- * This method must complete and return to its caller in a timely manner. - *

- * - * @param bundleContext - * The execution context of the bundle being started. - * @throws java.lang.Exception - * If this method throws an exception, this bundle is marked as stopped and the Framework will remove - * this bundle's listeners, unregister all services registered by this bundle, and release all services - * used by this bundle. - * @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext) - */ - @Override - public void start(final BundleContext bundleContext) throws Exception { - LOG.info("Starting Bundle " + getName()); - } - - /** - * Called when this bundle is stopped so the Framework can perform the bundle-specific activities necessary to stop - * the bundle. In general, this method should undo the work that the BundleActivator.start method started. There - * should be no active threads that were started by this bundle when this bundle returns. A stopped bundle must not - * call any Framework objects. - *

- * This method must complete and return to its caller in a timely manner. - *

- * - * @param ctx - * The execution context of the bundle being stopped. - * @throws java.lang.Exception - * If this method throws an exception, the bundle is still marked as stopped, and the Framework will - * remove the bundle's listeners, unregister all services registered by the bundle, and release all - * services used by the bundle. * - * @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext) - */ - @Override - public void stop(BundleContext ctx) throws Exception { - LOG.info("Stopped Bundle " + getName()); - } - - public String getName() { - return "DMaaP Adapter"; - } - -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java deleted file mode 100644 index 76b050d8e..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java +++ /dev/null @@ -1,108 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.http; - -import java.net.URI; - -import org.apache.commons.codec.binary.Base64; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.config.RequestConfig.Builder; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; - -public class CommonHttpClient { - - public static final int HTTPS_PORT = 3905; - - private String AUTH_STR; - - protected void setBasicAuth(String username, String password) { - if (username != null && password != null) { - String plain = String.format("%s:%s", username, password); - AUTH_STR = Base64.encodeBase64String(plain.getBytes()); - } else { - AUTH_STR = null; - } - } - - public HttpGet getReq(URI uri, int timeoutMs) throws Exception { - HttpGet out = (uri == null) ? new HttpGet() : new HttpGet(uri); - if (AUTH_STR != null) { - out.setHeader("Authorization", String.format("Basic %s", AUTH_STR)); - } - out.setConfig(getConfig(timeoutMs)); - return out; - } - - public HttpPost postReq(String url) throws Exception { - HttpPost out = (url == null) ? new HttpPost() : new HttpPost(url); - if (AUTH_STR != null) { - out.setHeader("Authorization", String.format("Basic %s", AUTH_STR)); - } - out.setConfig(getConfig(0)); - return out; - } - - private RequestConfig getConfig(int timeoutMs) { - Builder builder = RequestConfig.custom(); - builder.setSocketTimeout(timeoutMs + 5000); - return builder.build(); - } - - public CloseableHttpClient getClient() { - return getClient(false); - } - - public CloseableHttpClient getClient(boolean useHttps) { - return HttpClientBuilder.create().build(); - } - - public String formatHostString(String host) { - return formatHostString(host, host.contains(String.valueOf(HTTPS_PORT))); - } - - public String formatHostString(String host, boolean useHttps) { - // Trim trailing slash - String out = host.endsWith("/") ? host.substring(0, host.length() - 1) : host; - - boolean hasProto = out.startsWith("http"); - boolean hasPort = out.contains(":"); - - // Add protocol - if (!hasProto) { - out = String.format("%s%s", (useHttps) ? "https://" : "http://", out); - } - - // Add port - if (!hasPort) { - out = String.format("%s:%d", out, (useHttps) ? 3905 : 3904); - } - - return out; - - } -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java deleted file mode 100644 index df81e9718..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java +++ /dev/null @@ -1,167 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.http; - -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import org.apache.http.HttpEntity; -import org.apache.http.NameValuePair; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.utils.URIBuilder; -import org.apache.http.message.BasicNameValuePair; -import org.apache.http.util.EntityUtils; -import org.json.JSONArray; -import org.onap.appc.adapter.message.Consumer; - -public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer { - - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class); - - // Default values - private static final int DEFAULT_TIMEOUT_MS = 15000; - private static final int DEFAULT_LIMIT = 1000; - private static final String HTTPS_PORT = ":3905"; - private static final String URL_TEMPLATE = "%s/events/%s/%s/%s"; - - private List urls; - private String filter; - - private boolean useHttps = false; - - public HttpDmaapConsumerImpl(Collection hosts, String topicName, String consumerName, String consumerId) { - this(hosts, topicName, consumerName, consumerId, null); - } - - public HttpDmaapConsumerImpl(Collection hosts, String topicName, String consumerName, String consumerId, - String filter) { - this(hosts, topicName, consumerName, consumerId, filter, null, null); - } - - public HttpDmaapConsumerImpl(Collection hosts, String topicName, String consumerName, String consumerId, - String filter, String user, String password) { - urls = new ArrayList(); - for (String host : hosts) { - urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId)); - } - this.filter = filter; - updateCredentials(user, password); - } - - @Override - public void updateCredentials(String user, String pass) { - LOG.debug(String.format("Setting auth to %s for %s", user, this.toString())); - this.setBasicAuth(user, pass); - } - - @Override - public List fetch(int waitMs, int limit) { - LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); - List out = new ArrayList(); - try { - List urlParams = new ArrayList(); - urlParams.add(new BasicNameValuePair("timeout", String.valueOf(waitMs))); - urlParams.add(new BasicNameValuePair("limit", String.valueOf(limit))); - if (filter != null) { - urlParams.add(new BasicNameValuePair("filter", filter)); - } - - URIBuilder builder = new URIBuilder(urls.get(0)); - builder.setParameters(urlParams); - - URI uri = builder.build(); - LOG.info(String.format("GET %s", uri)); - HttpGet request = getReq(uri, waitMs); - CloseableHttpResponse response = getClient().execute(request); - - int httpStatus = response.getStatusLine().getStatusCode(); - HttpEntity entity = response.getEntity(); - String body = (entity != null) ? EntityUtils.toString(entity) : null; - - LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus, - (body != null ? body.length() : "null"))); - - response.close(); - if (httpStatus == 200 && body != null) { - JSONArray json = new JSONArray(body); - LOG.info(String.format("Got %d messages from DMaaP", json.length())); - for (int i = 0; i < json.length(); i++) { - out.add(json.getString(i)); - } - } else { - LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body)); - sleep(waitMs); - } - } catch (Exception e) { - if (urls.size() > 1) { - String failedUrl = urls.remove(0); - urls.add(failedUrl); - LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl, - urls.get(0))); - } - LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e); - sleep(waitMs); - } - - return out; - } - - @Override - public List fetch() { - return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); - } - - @Override - public String toString() { - String hostStr = (urls == null || urls.isEmpty()) ? "N/A" : urls.get(0); - return String.format("Consumer listening to [%s]", hostStr); - } - - @Override - public void useHttps(boolean yes) { - useHttps = yes; - } - - private void sleep(int ms) { - LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000)); - try { - Thread.sleep(ms); - } catch (InterruptedException e1) { - LOG.error("Interrupted while sleeping"); - } - } - - @Override - public void close() { - // Nothing to do - } - -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java deleted file mode 100644 index 560c09be4..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java +++ /dev/null @@ -1,138 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.http; - -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.StringEntity; -import org.onap.appc.adapter.message.Producer; - -public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer { - - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class); - - private static final String CONTENT_TYPE = "application/cambria"; - private static final String URL_TEMPLATE = "%s/events/%s"; - - private List hosts; - private Set topics; - - private boolean useHttps = false; - - public HttpDmaapProducerImpl(Collection urls, String topicName) { - hosts = new ArrayList(); - topics = new HashSet(); - topics.add(topicName); - - for (String host : urls) { - hosts.add(formatHostString(host)); - } - } - - public HttpDmaapProducerImpl(Collection urls, Set topicNames) { - hosts = new ArrayList(); - topics = topicNames; - - for (String host : urls) { - hosts.add(formatHostString(host)); - } - } - - @Override - public void updateCredentials(String user, String pass) { - LOG.debug(String.format("Setting auth to %s for %s", user, this.toString())); - this.setBasicAuth(user, pass); - } - - @Override - public boolean post(String partition, String data) { - int sent = 0; - try { - HttpPost request = postReq(null); - request.setHeader("Content-Type", CONTENT_TYPE); - request.setEntity(new StringEntity(bodyLine(partition, data))); - - for (String topic : topics) { - String uriStr = String.format(URL_TEMPLATE, hosts.get(0), topic); - try { - request.setURI(new URI(uriStr)); - CloseableHttpResponse response = getClient().execute(request); - if (response.getStatusLine().getStatusCode() == 200) { - sent++; - } - response.close(); - } catch (Exception sendEx) { - LOG.error(String.format("Failed to send message to %s. Reason: %s", uriStr, sendEx.getMessage()), - sendEx); - if (hosts.size() > 1) { - String failedUrl = hosts.remove(0); - hosts.add(failedUrl); - LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", - failedUrl, hosts.get(0))); - } - } - } - } catch (Exception buildEx) { - LOG.error( - String.format("Failed to build request with string [%s]. Message not sent to any topic. Reason: %s", - data, buildEx.getMessage()), - buildEx); - } - return sent == topics.size(); - } - - @Override - public void useHttps(boolean yes) { - useHttps = yes; - } - - /** - * Format the body for the application/cambria content type with no partitioning. See - * - * @param msg - * The message body to format - * @return A string in the application/cambria content type - */ - private String bodyLine(String partition, String msg) { - String p = (partition == null) ? "" : partition; - String m = (msg == null) ? "" : msg; - return String.format("%d.%d.%s%s", p.length(), m.length(), p, m); - } - - @Override - public void close() { - // Nothing to do - } -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java deleted file mode 100644 index ddf630545..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java +++ /dev/null @@ -1,234 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.impl; - -import java.io.IOException; -import java.util.*; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -//import com.att.nsa.cambria.client.CambriaClientBuilders; -//import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -//import com.att.nsa.cambria.client.CambriaConsumer; - -import com.att.nsa.mr.client.MRClientFactory; -import com.att.nsa.mr.client.MRConsumer; -import org.apache.commons.lang3.StringUtils; -import org.onap.appc.adapter.message.Consumer; -import org.onap.appc.configuration.Configuration; -import org.onap.appc.configuration.ConfigurationFactory; -import org.onap.appc.metricservice.MetricRegistry; -import org.onap.appc.metricservice.MetricService; -import org.onap.appc.metricservice.impl.MetricServiceImpl; -import org.onap.appc.metricservice.metric.Metric; -import org.onap.appc.metricservice.metric.MetricType; -import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric; -import org.onap.appc.metricservice.policy.PublishingPolicy; -import org.onap.appc.metricservice.publisher.LogPublisher; -import org.osgi.framework.BundleContext; -import org.osgi.framework.FrameworkUtil; -import org.osgi.framework.ServiceReference; - -public class DmaapConsumerImpl implements Consumer { - - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class); - private static final Configuration configuration = ConfigurationFactory.getConfiguration(); - // Default values - private static final int DEFAULT_TIMEOUT_MS = 60000; - private static final int DEFAULT_LIMIT = 1000; - private static MetricRegistry metricRegistry; - private String topic; - private DmaapRequestCounterMetric dmaapKpiMetric; - private boolean isMetricEnabled=false; - private boolean useHttps = false; - private MRConsumer client = null; - private Properties props = null; - - - public DmaapConsumerImpl(Collection urls, String topicName, String consumerGroupName, String consumerId,String user, String password) { - this(urls, topicName, consumerGroupName, consumerId,user, password,null); - - } - - public DmaapConsumerImpl(Collection urls, String topicName, String consumerGroupName, String consumerId,String user, String password,String filter) { - this.topic = topicName; - this.props = new Properties(); - String urlsStr = StringUtils.join(urls, ','); - props.setProperty("host",urlsStr); - props.setProperty("group",consumerGroupName); - props.setProperty("id",consumerId); - props.setProperty("username",user); - props.setProperty("password",password); - if(filter != null) { - props.setProperty("filter", filter); - } - } - - - private void initMetric() { - LOG.debug("Metric getting initialized"); - MetricService metricService = getMetricservice(); - metricRegistry = metricService.createRegistry("APPC"); - dmaapKpiMetric = metricRegistry.metricBuilderFactory(). - dmaapRequestCounterBuilder(). - withName("DMAAP_KPI").withType(MetricType.COUNTER). - withRecievedMessage(0) - .withPublishedMessage(0) - .build(); - if (metricRegistry.register(dmaapKpiMetric)) { - Metric[] metrics = new Metric[]{dmaapKpiMetric}; - LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); - LogPublisher[] logPublishers = new LogPublisher[1]; - logPublishers[0] = logPublisher; - PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory(). - scheduledPolicyBuilder().withPublishers(logPublishers). - withMetrics(metrics). - build(); - LOG.debug("Policy getting initialized"); - manuallyScheduledPublishingPolicy.init(); - LOG.debug("Metric initialized"); - } - } - private MRConsumer getClient() { - return getClient(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); - } - - /** - * @return An instance of MRConsumer created from our class variables - */ - private synchronized MRConsumer getClient(int waitMs, int limit) { - try { - props.setProperty("timeout",String.valueOf(waitMs)); - props.setProperty("limit",String.valueOf(limit)); - String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic,props); - return MRClientFactory.createConsumer ( topicProducerPropFileName); - } catch (IOException e1) { - LOG.error("failed to createConsumer",e1); - return null; - } - } - - @Override - public synchronized void updateCredentials(String key, String secret) { - LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); - String user = key; - String password = secret; - props.setProperty("user",String.valueOf(user)); - props.setProperty("password",String.valueOf(password)); - client = null; - } - - @Override - public List fetch(int waitMs, int limit) { - Properties properties=configuration.getProperties(); - if(properties!=null && properties.getProperty("metric.enabled")!=null ){ - isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled")); - } - if(isMetricEnabled){ - initMetric(); - } - LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); - List out = new ArrayList(); - - // Create client once and reuse it on subsequent fetches. This is - // to support failover to other servers in the DMaaP cluster. - if (client == null) { - LOG.info("Getting DMaaP Client ..."); - client = getClient(waitMs, limit); - } - try { - for (String s : client.fetch(waitMs, limit)) { - out.add(s); - if(isMetricEnabled){ - ((DmaapRequestCounterMetric)metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage(); - } - } - LOG.debug(String.format("Got %d records from %s", out.size(), this.toString())); - } catch (Exception e) { - // Connection exception - LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage())); - e.printStackTrace(); - try { - LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs)); - Thread.sleep(waitMs); - } catch (InterruptedException e2) { - LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs)); - } - } - - - return out; - } - - /** - * Close consumer Dmaap client - */ - @Override - public void close() { - LOG.debug("Closing Dmaap consumer client...."); - if (client != null) { - client.close(); - } - } - - @Override - public List fetch() { - return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); - } - - @Override - public String toString() { - String hostStr = (props == null || props.getProperty("host") == null? "N/A" : props.getProperty("host")); - String group = (props == null || props.getProperty("group") == null? "N/A" : props.getProperty("group")); - String id = (props == null || props.getProperty("id") == null? "N/A" : props.getProperty("id")); - return String.format("Consumer %s/%s listening to %s on [%s]", group, id, topic, hostStr); - } - - @Override - public void useHttps(boolean yes) { - useHttps = yes; - } - - - private MetricService getMetricservice() { - BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext(); - // Get AAIadapter reference - ServiceReference sref = bctx.getServiceReference(MetricService.class.getName()); - if (sref != null) { - LOG.info("Metric Service from bundlecontext"); - return (MetricServiceImpl) bctx.getService(sref); - - } else { - LOG.info("Metric Service error from bundlecontext"); - LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService"); - return null; - - } - } - - public Metric getMetric(String name){ - return metricRegistry.metric(name); - } -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java deleted file mode 100644 index 7ed06a9e3..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java +++ /dev/null @@ -1,223 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.impl; - -import java.io.*; -import java.util.*; -import java.util.concurrent.TimeUnit; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -//import com.att.nsa.cambria.client.CambriaBatchingPublisher; -//import com.att.nsa.cambria.client.CambriaClientBuilders; -//import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; - -import com.att.nsa.mr.client.MRBatchingPublisher; -import com.att.nsa.mr.client.MRClientFactory; -import org.apache.commons.lang3.StringUtils; -import org.onap.appc.adapter.message.Producer; -import org.onap.appc.adapter.messaging.dmaap.impl.DmaapUtil; -import org.onap.appc.configuration.Configuration; -import org.onap.appc.configuration.ConfigurationFactory; -import org.onap.appc.metricservice.MetricRegistry; -import org.onap.appc.metricservice.MetricService; -import org.onap.appc.metricservice.metric.Metric; -import org.onap.appc.metricservice.metric.MetricType; -import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric; -import org.onap.appc.metricservice.policy.PublishingPolicy; -import org.onap.appc.metricservice.publisher.LogPublisher; -import org.osgi.framework.BundleContext; -import org.osgi.framework.FrameworkUtil; -import org.osgi.framework.ServiceReference; - -public class DmaapProducerImpl implements Producer { - - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class); - private static final Configuration configuration = ConfigurationFactory.getConfiguration(); - - private Set topics = new HashSet(); - - private Properties props = null; - private static MetricRegistry metricRegistry; - private boolean useHttps = false; - private DmaapRequestCounterMetric dmaapKpiMetric; - private boolean isMetricEnabled=false; - - private Set clients; - - - public DmaapProducerImpl(Collection urls, String topicName, String user, String password) { - this(urls, (Set)null, user, password); - this.topics = new HashSet<>(); - if (topicName != null) { - for (String topic : topicName.split(",")) { - topics.add(topic); - } - } - } - - public DmaapProducerImpl(Collection urls, Set topicNames, String user, String password) { - topics = topicNames; - if(urls == null || user == null || password == null){ - throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password" ); - } - this.props = new Properties(); - String urlsStr = StringUtils.join(urls, ','); - props.setProperty("host",urlsStr); - props.setProperty("id", UUID.randomUUID().toString()); - props.setProperty("username",user); - props.setProperty("password",password); - } - private void initMetric() { - LOG.debug("Metric getting initialized"); - MetricService metricService = getMetricservice(); - metricRegistry=metricService.createRegistry("APPC"); - dmaapKpiMetric = metricRegistry.metricBuilderFactory(). - dmaapRequestCounterBuilder(). - withName("DMAAP_KPI").withType(MetricType.COUNTER). - withRecievedMessage(0) - .withPublishedMessage(0) - .build(); - if(metricRegistry.register(dmaapKpiMetric)) { - Metric[] metrics = new Metric[]{dmaapKpiMetric}; - LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); - LogPublisher[] logPublishers = new LogPublisher[1]; - logPublishers[0] = logPublisher; - PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory(). - scheduledPolicyBuilder().withPublishers(logPublishers). - withMetrics(metrics). - build(); - LOG.debug("Policy getting initialized"); - manuallyScheduledPublishingPolicy.init(); - LOG.debug("Metric initialized"); - } - - } - private Set getClients() { - Set out = new HashSet(); - for (String topic : topics) { - try { - String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props); - final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher (topicProducerPropFileName); - out.add(client); - } catch (Exception e) { - e.printStackTrace(); - } - } - - return out; - } - - @Override - public synchronized void updateCredentials(String key, String secret) { - LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); - String user = key; - String password = secret; - props.setProperty("user",String.valueOf(user)); - props.setProperty("password",String.valueOf(password)); - clients = null; - } - - @Override - public boolean post(String partition, String data) { - boolean success = true; - Properties properties=configuration.getProperties(); - if(properties!=null && properties.getProperty("metric.enabled")!=null ){ - isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled")); - } - if(isMetricEnabled){ - initMetric(); - } - - // Create clients once and reuse them on subsequent posts. This is - // to support failover to other servers in the Dmaap cluster. - if ((clients == null) || (clients.isEmpty())) { - LOG.info("Getting CambriaBatchingPublisher Clients ..."); - clients = getClients(); - } - - for (MRBatchingPublisher client : clients) { - try { - LOG.debug(String.format("Posting %s to %s", data, client)); - client.send(partition, data); - } catch (IOException e) { - e.printStackTrace(); - success = false; - } - } - if(isMetricEnabled){ - ( (DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage(); - } - return success; - } - - /** - * Close producer Dmaap client - */ - @Override - public void close() { - if ((clients == null) || (clients.isEmpty())) { - return; - } - - LOG.debug("Closing Dmaap producer clients...."); - for (MRBatchingPublisher client : clients) { - try { - client.close(1, TimeUnit.SECONDS); - } catch (IOException | InterruptedException e) { - LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client)); - e.printStackTrace(); - } - } - } - - @Override - public void useHttps(boolean yes) { - useHttps = yes; - } - - private MetricService getMetricservice() { -/* - return AppcDmaapAdapterActivator.getMetricService(); -*/ - - BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext(); - ServiceReference sref = bctx.getServiceReference(MetricService.class.getName()); - if (sref != null) { - LOG.info("Metric Service from bundlecontext"); - return (MetricService) bctx.getService(sref); - - } else { - LOG.info("Metric Service error from bundlecontext"); - LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService"); - return null; - - } - } - - public Metric getMetric(String name){ - return metricRegistry.metric(name); - } -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java deleted file mode 100644 index da1b0fa38..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java +++ /dev/null @@ -1,86 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.impl; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Properties; - -public class DmaapUtil { - private final static String delimiter = "_"; - private static String createPreferredRouteFileIfNotExist(String topic) throws IOException { - String topicPreferredRouteFileName = null; - topicPreferredRouteFileName = topic+"preferredRoute.properties"; - File fo= new File(topicPreferredRouteFileName); - if(!fo.exists()) { - ClassLoader classLoader = DmaapUtil.class.getClassLoader(); - InputStream inputStream = classLoader.getResourceAsStream("preferredRoute.txt"); - Properties props = new Properties(); - props.load(inputStream); - String fileName = topic != null ? topic+delimiter+"MR1" : delimiter+"MR1"; - props.setProperty("preferredRouteKey", fileName); - topicPreferredRouteFileName = topic + "preferredRoute.properties"; - props.store(new FileOutputStream(topicPreferredRouteFileName), "preferredRoute.properties file created on the fly for topic:" + topic + " on:" + System.currentTimeMillis()); - } - return topicPreferredRouteFileName; - } - - public static String createConsumerPropFile(String topic, Properties props)throws IOException { - String defaultProfFileName = "consumer.properties"; - String topicConsumerPropFileName = createConsumerProducerPropFile(topic, defaultProfFileName,props); - return topicConsumerPropFileName; - } - - public static String createProducerPropFile(String topic, Properties props)throws IOException { - String defaultProfFileName = "producer.properties"; - String topicConsumerPropFileName = createConsumerProducerPropFile(topic, defaultProfFileName,props); - return topicConsumerPropFileName; - } - - private static String createConsumerProducerPropFile(String topic, String defaultProfFileName, Properties props) throws IOException { - ClassLoader classLoader = DmaapUtil.class.getClassLoader(); - InputStream inputStream = classLoader.getResourceAsStream(defaultProfFileName); - Properties defaultProps = new Properties(); - defaultProps.load(inputStream); - defaultProps.setProperty("topic",topic); - - String preferredRouteFileName = DmaapUtil.createPreferredRouteFileIfNotExist(topic); - if(props != null && !props.isEmpty()){ - defaultProps.putAll(props); - } - defaultProps.setProperty("topic",topic); - defaultProps.setProperty("DME2preferredRouterFilePath",preferredRouteFileName); - String id = defaultProps.getProperty("id"); - String topicConsumerPropFileName = defaultProfFileName; - topicConsumerPropFileName = id != null ? id+delimiter+topicConsumerPropFileName : delimiter+topicConsumerPropFileName; - topicConsumerPropFileName = topic != null ? topic+delimiter+topicConsumerPropFileName : delimiter+topicConsumerPropFileName; - - defaultProps.store(new FileOutputStream(topicConsumerPropFileName), defaultProfFileName+" file created on the fly for topic:"+topic+" on:"+System.currentTimeMillis()); - return topicConsumerPropFileName; - } - -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java deleted file mode 100644 index bf7649026..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java +++ /dev/null @@ -1,178 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.impl; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import org.onap.ccsdk.sli.core.sli.SvcLogicContext; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -import org.onap.appc.adapter.message.EventSender; -import org.onap.appc.adapter.message.MessageDestination; -import org.onap.appc.adapter.message.Producer; -import org.onap.appc.adapter.message.event.EventHeader; -import org.onap.appc.adapter.message.event.EventMessage; -import org.onap.appc.adapter.message.event.EventStatus; -import org.onap.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl; -import org.onap.appc.configuration.Configuration; -import org.onap.appc.configuration.ConfigurationFactory; -import org.onap.appc.exceptions.APPCException; - -public class EventSenderDmaapImpl implements EventSender -{ - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderDmaapImpl.class); - public static final String EVENT_TOPIC_WRITE = "dmaap.event.topic.write"; - public static final String DMAAP_USERNAME = "dmaap.appc.username"; - public static final String DMAAP_PASSWORD = "dmaap.appc.password"; - public static final String EVENT_POOL_MEMBERS = "dmaap.event.pool.members"; - - private static Configuration configuration = ConfigurationFactory.getConfiguration(); - - private Map producerMap = new ConcurrentHashMap<>(); - - public Map getProducerMap() { - return producerMap; - } - - public void setProducerMap(Map producerMap) { - this.producerMap = producerMap; - } - - public EventSenderDmaapImpl(){ - - } - - public void initialize(){ - Properties properties = configuration.getProperties(); - String writeTopic; - String username; - String password; - final List pool = new ArrayList<>(); - - for(MessageDestination destination: MessageDestination.values()){ - writeTopic = properties.getProperty(destination + "." + EVENT_TOPIC_WRITE); - username = properties.getProperty(destination + "." + DMAAP_USERNAME); - password = properties.getProperty(destination + "." + DMAAP_PASSWORD); - String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS); - - if (hostNames != null && !hostNames.isEmpty()) { - LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS)); - Collections.addAll(pool, hostNames.split(",")); - } - - LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS)); - LOG.debug(String.format("writeTopic = %s, taken from property: %s", writeTopic, destination + "." + EVENT_TOPIC_WRITE)); - LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME)); - Producer producer = new DmaapProducerImpl(pool, writeTopic,username, password); - - for (String url : pool) { - if (url.contains("3905") || url.contains("https")) { - LOG.debug("Producer should use HTTPS"); - producer.useHttps(true); - break; - } - } - producerMap.put(destination.toString(),producer); - } - - } - - @Override - public boolean sendEvent(MessageDestination destination, EventMessage msg) { - String jsonStr = msg.toJson(); - String id = msg.getEventHeader().getEventId(); - LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr)); - Producer producer = producerMap.get(destination.toString()); - return producer.post(id, jsonStr); - } - - @Override - public boolean sendEvent(MessageDestination destination, EventMessage msg, String eventTopicName) { - String jsonStr = msg.toJson(); - String id = msg.getEventHeader().getEventId(); - LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr)); - Producer producer = createProducer(destination, eventTopicName); - return producer.post(id, jsonStr); - } - - private Producer createProducer(MessageDestination destination, String eventTopicName) { - Properties properties = configuration.getProperties(); - final List pool = new ArrayList<>(); - String username = properties.getProperty(destination + "." + DMAAP_USERNAME); - String password = properties.getProperty(destination + "." + DMAAP_PASSWORD); - String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS); - - if (hostNames != null && !hostNames.isEmpty()) { - LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS)); - Collections.addAll(pool, hostNames.split(",")); - } - - LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS)); - LOG.debug(String.format("writeTopic = %s, taken from property: %s", eventTopicName, destination + "." + EVENT_TOPIC_WRITE)); - LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME)); - Producer producer = new DmaapProducerImpl(pool, eventTopicName,username, password); - - for (String url : pool) { - if (url.contains("3905") || url.contains("https")) { - LOG.debug("Producer should use HTTPS"); - producer.useHttps(true); - break; - } - } - return producer; - } - - @Override - public boolean sendEvent(MessageDestination destination, Map params, SvcLogicContext ctx) throws APPCException { - - if (params == null) { - String message = "Parameters map is empty (null)"; - LOG.error(message); - throw new APPCException(message); - } - String eventTime = new Date(System.currentTimeMillis()).toString(); - String apiVer = params.get("apiVer"); - String eventId = params.get("eventId"); - String reason = params.get("reason"); - String entityId=params.get("entityId"); - if(entityId!=null){ - reason=reason+"("+entityId+")"; - } - Integer code = Integer.getInteger(params.get("code"), 500); - - if (eventTime == null || apiVer == null || eventId == null || reason == null) { - String message = String.format("Missing input parameters: %s", params); - LOG.error(message); - throw new APPCException(message); - } - EventMessage eventMessage = new EventMessage( - new EventHeader(eventTime, apiVer, eventId), - new EventStatus(code, reason)); - - return sendEvent(destination,eventMessage); - } -} -- cgit 1.2.3-korg