diff options
author | Patrick Brady <pb071s@att.com> | 2017-12-13 11:14:21 -0800 |
---|---|---|
committer | Patrick Brady <pb071s@att.com> | 2017-12-13 11:14:31 -0800 |
commit | 161df8a94bb3b0c34ed16fd4fdba078bd1eeef9a (patch) | |
tree | 21f410d053d5e390902d72bb3ebe6889451dd633 /appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp | |
parent | 0756759f39e125b02d63b4e93de83b3c6b13beea (diff) |
Second part of onap rename
This is the second commit of the rename. The folder structure
is renamed for appc-adapters and appc-config in this commit.
Change-Id: Iaa2b8c937ff1ca1b5d1178128961fb115ee65d9b
Signed-off-by: Patrick Brady <pb071s@att.com>
Issue-ID: APPC-13
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp')
8 files changed, 0 insertions, 1240 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java deleted file mode 100644 index 812f80121..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/AppcDmaapAdapterActivator.java +++ /dev/null @@ -1,106 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import org.osgi.framework.BundleActivator; -import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceRegistration; - - -/** - * This activator is used to initialize and terminate the connection pool to one or more providers. - * <p> - * The CDP abstraction layer supports multiple types of providers, with each provider supporting multiple tenants. The - * "connection" to a specific tenant on a specific provider is represented by a "context" object. These context objects - * are authenticated to a specific tenant on the provider, but can be reused from one request to another. Contexts are - * slow to set up and are resource intensive, so they are cached. However, the contexts for a specific tenant on a - * specific provider must be cached separately. - * </p> - * <p> - * Activation of the bundle creates an empty cache which is organized first by provider type, then by tenant name, with - * the contents being an empty pool of contexts for that provider/tenant combination. The pool is created on first use, - * and retained for as long as the bundle is active. - * </p> - * <p> - * When the bundle is deactivated, the cache is torn down with all contexts being closed. - * </p> - */ -public class AppcDmaapAdapterActivator implements BundleActivator { - private ServiceRegistration registration = null; - - /** - * The logger to be used - */ - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(AppcDmaapAdapterActivator.class); - - /** - * Called when this bundle is started so the Framework can perform the bundle-specific activities necessary to start - * this bundle. This method can be used to register services or to allocate any resources that this bundle needs. - * <p> - * This method must complete and return to its caller in a timely manner. - * </p> - * - * @param bundleContext - * The execution context of the bundle being started. - * @throws java.lang.Exception - * If this method throws an exception, this bundle is marked as stopped and the Framework will remove - * this bundle's listeners, unregister all services registered by this bundle, and release all services - * used by this bundle. - * @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext) - */ - @Override - public void start(final BundleContext bundleContext) throws Exception { - LOG.info("Starting Bundle " + getName()); - } - - /** - * Called when this bundle is stopped so the Framework can perform the bundle-specific activities necessary to stop - * the bundle. In general, this method should undo the work that the BundleActivator.start method started. There - * should be no active threads that were started by this bundle when this bundle returns. A stopped bundle must not - * call any Framework objects. - * <p> - * This method must complete and return to its caller in a timely manner. - * </p> - * - * @param ctx - * The execution context of the bundle being stopped. - * @throws java.lang.Exception - * If this method throws an exception, the bundle is still marked as stopped, and the Framework will - * remove the bundle's listeners, unregister all services registered by the bundle, and release all - * services used by the bundle. * - * @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext) - */ - @Override - public void stop(BundleContext ctx) throws Exception { - LOG.info("Stopped Bundle " + getName()); - } - - public String getName() { - return "DMaaP Adapter"; - } - -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java deleted file mode 100644 index 76b050d8e..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/CommonHttpClient.java +++ /dev/null @@ -1,108 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.http; - -import java.net.URI; - -import org.apache.commons.codec.binary.Base64; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.config.RequestConfig.Builder; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; - -public class CommonHttpClient { - - public static final int HTTPS_PORT = 3905; - - private String AUTH_STR; - - protected void setBasicAuth(String username, String password) { - if (username != null && password != null) { - String plain = String.format("%s:%s", username, password); - AUTH_STR = Base64.encodeBase64String(plain.getBytes()); - } else { - AUTH_STR = null; - } - } - - public HttpGet getReq(URI uri, int timeoutMs) throws Exception { - HttpGet out = (uri == null) ? new HttpGet() : new HttpGet(uri); - if (AUTH_STR != null) { - out.setHeader("Authorization", String.format("Basic %s", AUTH_STR)); - } - out.setConfig(getConfig(timeoutMs)); - return out; - } - - public HttpPost postReq(String url) throws Exception { - HttpPost out = (url == null) ? new HttpPost() : new HttpPost(url); - if (AUTH_STR != null) { - out.setHeader("Authorization", String.format("Basic %s", AUTH_STR)); - } - out.setConfig(getConfig(0)); - return out; - } - - private RequestConfig getConfig(int timeoutMs) { - Builder builder = RequestConfig.custom(); - builder.setSocketTimeout(timeoutMs + 5000); - return builder.build(); - } - - public CloseableHttpClient getClient() { - return getClient(false); - } - - public CloseableHttpClient getClient(boolean useHttps) { - return HttpClientBuilder.create().build(); - } - - public String formatHostString(String host) { - return formatHostString(host, host.contains(String.valueOf(HTTPS_PORT))); - } - - public String formatHostString(String host, boolean useHttps) { - // Trim trailing slash - String out = host.endsWith("/") ? host.substring(0, host.length() - 1) : host; - - boolean hasProto = out.startsWith("http"); - boolean hasPort = out.contains(":"); - - // Add protocol - if (!hasProto) { - out = String.format("%s%s", (useHttps) ? "https://" : "http://", out); - } - - // Add port - if (!hasPort) { - out = String.format("%s:%d", out, (useHttps) ? 3905 : 3904); - } - - return out; - - } -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java deleted file mode 100644 index df81e9718..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapConsumerImpl.java +++ /dev/null @@ -1,167 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.http; - -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import org.apache.http.HttpEntity; -import org.apache.http.NameValuePair; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.utils.URIBuilder; -import org.apache.http.message.BasicNameValuePair; -import org.apache.http.util.EntityUtils; -import org.json.JSONArray; -import org.onap.appc.adapter.message.Consumer; - -public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer { - - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class); - - // Default values - private static final int DEFAULT_TIMEOUT_MS = 15000; - private static final int DEFAULT_LIMIT = 1000; - private static final String HTTPS_PORT = ":3905"; - private static final String URL_TEMPLATE = "%s/events/%s/%s/%s"; - - private List<String> urls; - private String filter; - - private boolean useHttps = false; - - public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId) { - this(hosts, topicName, consumerName, consumerId, null); - } - - public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId, - String filter) { - this(hosts, topicName, consumerName, consumerId, filter, null, null); - } - - public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId, - String filter, String user, String password) { - urls = new ArrayList<String>(); - for (String host : hosts) { - urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId)); - } - this.filter = filter; - updateCredentials(user, password); - } - - @Override - public void updateCredentials(String user, String pass) { - LOG.debug(String.format("Setting auth to %s for %s", user, this.toString())); - this.setBasicAuth(user, pass); - } - - @Override - public List<String> fetch(int waitMs, int limit) { - LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); - List<String> out = new ArrayList<String>(); - try { - List<NameValuePair> urlParams = new ArrayList<NameValuePair>(); - urlParams.add(new BasicNameValuePair("timeout", String.valueOf(waitMs))); - urlParams.add(new BasicNameValuePair("limit", String.valueOf(limit))); - if (filter != null) { - urlParams.add(new BasicNameValuePair("filter", filter)); - } - - URIBuilder builder = new URIBuilder(urls.get(0)); - builder.setParameters(urlParams); - - URI uri = builder.build(); - LOG.info(String.format("GET %s", uri)); - HttpGet request = getReq(uri, waitMs); - CloseableHttpResponse response = getClient().execute(request); - - int httpStatus = response.getStatusLine().getStatusCode(); - HttpEntity entity = response.getEntity(); - String body = (entity != null) ? EntityUtils.toString(entity) : null; - - LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus, - (body != null ? body.length() : "null"))); - - response.close(); - if (httpStatus == 200 && body != null) { - JSONArray json = new JSONArray(body); - LOG.info(String.format("Got %d messages from DMaaP", json.length())); - for (int i = 0; i < json.length(); i++) { - out.add(json.getString(i)); - } - } else { - LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body)); - sleep(waitMs); - } - } catch (Exception e) { - if (urls.size() > 1) { - String failedUrl = urls.remove(0); - urls.add(failedUrl); - LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl, - urls.get(0))); - } - LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e); - sleep(waitMs); - } - - return out; - } - - @Override - public List<String> fetch() { - return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); - } - - @Override - public String toString() { - String hostStr = (urls == null || urls.isEmpty()) ? "N/A" : urls.get(0); - return String.format("Consumer listening to [%s]", hostStr); - } - - @Override - public void useHttps(boolean yes) { - useHttps = yes; - } - - private void sleep(int ms) { - LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000)); - try { - Thread.sleep(ms); - } catch (InterruptedException e1) { - LOG.error("Interrupted while sleeping"); - } - } - - @Override - public void close() { - // Nothing to do - } - -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java deleted file mode 100644 index 560c09be4..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/http/HttpDmaapProducerImpl.java +++ /dev/null @@ -1,138 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.http; - -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.StringEntity; -import org.onap.appc.adapter.message.Producer; - -public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer { - - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class); - - private static final String CONTENT_TYPE = "application/cambria"; - private static final String URL_TEMPLATE = "%s/events/%s"; - - private List<String> hosts; - private Set<String> topics; - - private boolean useHttps = false; - - public HttpDmaapProducerImpl(Collection<String> urls, String topicName) { - hosts = new ArrayList<String>(); - topics = new HashSet<String>(); - topics.add(topicName); - - for (String host : urls) { - hosts.add(formatHostString(host)); - } - } - - public HttpDmaapProducerImpl(Collection<String> urls, Set<String> topicNames) { - hosts = new ArrayList<String>(); - topics = topicNames; - - for (String host : urls) { - hosts.add(formatHostString(host)); - } - } - - @Override - public void updateCredentials(String user, String pass) { - LOG.debug(String.format("Setting auth to %s for %s", user, this.toString())); - this.setBasicAuth(user, pass); - } - - @Override - public boolean post(String partition, String data) { - int sent = 0; - try { - HttpPost request = postReq(null); - request.setHeader("Content-Type", CONTENT_TYPE); - request.setEntity(new StringEntity(bodyLine(partition, data))); - - for (String topic : topics) { - String uriStr = String.format(URL_TEMPLATE, hosts.get(0), topic); - try { - request.setURI(new URI(uriStr)); - CloseableHttpResponse response = getClient().execute(request); - if (response.getStatusLine().getStatusCode() == 200) { - sent++; - } - response.close(); - } catch (Exception sendEx) { - LOG.error(String.format("Failed to send message to %s. Reason: %s", uriStr, sendEx.getMessage()), - sendEx); - if (hosts.size() > 1) { - String failedUrl = hosts.remove(0); - hosts.add(failedUrl); - LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", - failedUrl, hosts.get(0))); - } - } - } - } catch (Exception buildEx) { - LOG.error( - String.format("Failed to build request with string [%s]. Message not sent to any topic. Reason: %s", - data, buildEx.getMessage()), - buildEx); - } - return sent == topics.size(); - } - - @Override - public void useHttps(boolean yes) { - useHttps = yes; - } - - /** - * Format the body for the application/cambria content type with no partitioning. See - * - * @param msg - * The message body to format - * @return A string in the application/cambria content type - */ - private String bodyLine(String partition, String msg) { - String p = (partition == null) ? "" : partition; - String m = (msg == null) ? "" : msg; - return String.format("%d.%d.%s%s", p.length(), m.length(), p, m); - } - - @Override - public void close() { - // Nothing to do - } -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java deleted file mode 100644 index ddf630545..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java +++ /dev/null @@ -1,234 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.impl; - -import java.io.IOException; -import java.util.*; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -//import com.att.nsa.cambria.client.CambriaClientBuilders; -//import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -//import com.att.nsa.cambria.client.CambriaConsumer; - -import com.att.nsa.mr.client.MRClientFactory; -import com.att.nsa.mr.client.MRConsumer; -import org.apache.commons.lang3.StringUtils; -import org.onap.appc.adapter.message.Consumer; -import org.onap.appc.configuration.Configuration; -import org.onap.appc.configuration.ConfigurationFactory; -import org.onap.appc.metricservice.MetricRegistry; -import org.onap.appc.metricservice.MetricService; -import org.onap.appc.metricservice.impl.MetricServiceImpl; -import org.onap.appc.metricservice.metric.Metric; -import org.onap.appc.metricservice.metric.MetricType; -import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric; -import org.onap.appc.metricservice.policy.PublishingPolicy; -import org.onap.appc.metricservice.publisher.LogPublisher; -import org.osgi.framework.BundleContext; -import org.osgi.framework.FrameworkUtil; -import org.osgi.framework.ServiceReference; - -public class DmaapConsumerImpl implements Consumer { - - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class); - private static final Configuration configuration = ConfigurationFactory.getConfiguration(); - // Default values - private static final int DEFAULT_TIMEOUT_MS = 60000; - private static final int DEFAULT_LIMIT = 1000; - private static MetricRegistry metricRegistry; - private String topic; - private DmaapRequestCounterMetric dmaapKpiMetric; - private boolean isMetricEnabled=false; - private boolean useHttps = false; - private MRConsumer client = null; - private Properties props = null; - - - public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password) { - this(urls, topicName, consumerGroupName, consumerId,user, password,null); - - } - - public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password,String filter) { - this.topic = topicName; - this.props = new Properties(); - String urlsStr = StringUtils.join(urls, ','); - props.setProperty("host",urlsStr); - props.setProperty("group",consumerGroupName); - props.setProperty("id",consumerId); - props.setProperty("username",user); - props.setProperty("password",password); - if(filter != null) { - props.setProperty("filter", filter); - } - } - - - private void initMetric() { - LOG.debug("Metric getting initialized"); - MetricService metricService = getMetricservice(); - metricRegistry = metricService.createRegistry("APPC"); - dmaapKpiMetric = metricRegistry.metricBuilderFactory(). - dmaapRequestCounterBuilder(). - withName("DMAAP_KPI").withType(MetricType.COUNTER). - withRecievedMessage(0) - .withPublishedMessage(0) - .build(); - if (metricRegistry.register(dmaapKpiMetric)) { - Metric[] metrics = new Metric[]{dmaapKpiMetric}; - LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); - LogPublisher[] logPublishers = new LogPublisher[1]; - logPublishers[0] = logPublisher; - PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory(). - scheduledPolicyBuilder().withPublishers(logPublishers). - withMetrics(metrics). - build(); - LOG.debug("Policy getting initialized"); - manuallyScheduledPublishingPolicy.init(); - LOG.debug("Metric initialized"); - } - } - private MRConsumer getClient() { - return getClient(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); - } - - /** - * @return An instance of MRConsumer created from our class variables - */ - private synchronized MRConsumer getClient(int waitMs, int limit) { - try { - props.setProperty("timeout",String.valueOf(waitMs)); - props.setProperty("limit",String.valueOf(limit)); - String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic,props); - return MRClientFactory.createConsumer ( topicProducerPropFileName); - } catch (IOException e1) { - LOG.error("failed to createConsumer",e1); - return null; - } - } - - @Override - public synchronized void updateCredentials(String key, String secret) { - LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); - String user = key; - String password = secret; - props.setProperty("user",String.valueOf(user)); - props.setProperty("password",String.valueOf(password)); - client = null; - } - - @Override - public List<String> fetch(int waitMs, int limit) { - Properties properties=configuration.getProperties(); - if(properties!=null && properties.getProperty("metric.enabled")!=null ){ - isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled")); - } - if(isMetricEnabled){ - initMetric(); - } - LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); - List<String> out = new ArrayList<String>(); - - // Create client once and reuse it on subsequent fetches. This is - // to support failover to other servers in the DMaaP cluster. - if (client == null) { - LOG.info("Getting DMaaP Client ..."); - client = getClient(waitMs, limit); - } - try { - for (String s : client.fetch(waitMs, limit)) { - out.add(s); - if(isMetricEnabled){ - ((DmaapRequestCounterMetric)metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage(); - } - } - LOG.debug(String.format("Got %d records from %s", out.size(), this.toString())); - } catch (Exception e) { - // Connection exception - LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage())); - e.printStackTrace(); - try { - LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs)); - Thread.sleep(waitMs); - } catch (InterruptedException e2) { - LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs)); - } - } - - - return out; - } - - /** - * Close consumer Dmaap client - */ - @Override - public void close() { - LOG.debug("Closing Dmaap consumer client...."); - if (client != null) { - client.close(); - } - } - - @Override - public List<String> fetch() { - return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); - } - - @Override - public String toString() { - String hostStr = (props == null || props.getProperty("host") == null? "N/A" : props.getProperty("host")); - String group = (props == null || props.getProperty("group") == null? "N/A" : props.getProperty("group")); - String id = (props == null || props.getProperty("id") == null? "N/A" : props.getProperty("id")); - return String.format("Consumer %s/%s listening to %s on [%s]", group, id, topic, hostStr); - } - - @Override - public void useHttps(boolean yes) { - useHttps = yes; - } - - - private MetricService getMetricservice() { - BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext(); - // Get AAIadapter reference - ServiceReference sref = bctx.getServiceReference(MetricService.class.getName()); - if (sref != null) { - LOG.info("Metric Service from bundlecontext"); - return (MetricServiceImpl) bctx.getService(sref); - - } else { - LOG.info("Metric Service error from bundlecontext"); - LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService"); - return null; - - } - } - - public Metric getMetric(String name){ - return metricRegistry.metric(name); - } -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java deleted file mode 100644 index 7ed06a9e3..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java +++ /dev/null @@ -1,223 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.impl; - -import java.io.*; -import java.util.*; -import java.util.concurrent.TimeUnit; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -//import com.att.nsa.cambria.client.CambriaBatchingPublisher; -//import com.att.nsa.cambria.client.CambriaClientBuilders; -//import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; - -import com.att.nsa.mr.client.MRBatchingPublisher; -import com.att.nsa.mr.client.MRClientFactory; -import org.apache.commons.lang3.StringUtils; -import org.onap.appc.adapter.message.Producer; -import org.onap.appc.adapter.messaging.dmaap.impl.DmaapUtil; -import org.onap.appc.configuration.Configuration; -import org.onap.appc.configuration.ConfigurationFactory; -import org.onap.appc.metricservice.MetricRegistry; -import org.onap.appc.metricservice.MetricService; -import org.onap.appc.metricservice.metric.Metric; -import org.onap.appc.metricservice.metric.MetricType; -import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric; -import org.onap.appc.metricservice.policy.PublishingPolicy; -import org.onap.appc.metricservice.publisher.LogPublisher; -import org.osgi.framework.BundleContext; -import org.osgi.framework.FrameworkUtil; -import org.osgi.framework.ServiceReference; - -public class DmaapProducerImpl implements Producer { - - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class); - private static final Configuration configuration = ConfigurationFactory.getConfiguration(); - - private Set<String> topics = new HashSet<String>(); - - private Properties props = null; - private static MetricRegistry metricRegistry; - private boolean useHttps = false; - private DmaapRequestCounterMetric dmaapKpiMetric; - private boolean isMetricEnabled=false; - - private Set<MRBatchingPublisher> clients; - - - public DmaapProducerImpl(Collection<String> urls, String topicName, String user, String password) { - this(urls, (Set<String>)null, user, password); - this.topics = new HashSet<>(); - if (topicName != null) { - for (String topic : topicName.split(",")) { - topics.add(topic); - } - } - } - - public DmaapProducerImpl(Collection<String> urls, Set<String> topicNames, String user, String password) { - topics = topicNames; - if(urls == null || user == null || password == null){ - throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password" ); - } - this.props = new Properties(); - String urlsStr = StringUtils.join(urls, ','); - props.setProperty("host",urlsStr); - props.setProperty("id", UUID.randomUUID().toString()); - props.setProperty("username",user); - props.setProperty("password",password); - } - private void initMetric() { - LOG.debug("Metric getting initialized"); - MetricService metricService = getMetricservice(); - metricRegistry=metricService.createRegistry("APPC"); - dmaapKpiMetric = metricRegistry.metricBuilderFactory(). - dmaapRequestCounterBuilder(). - withName("DMAAP_KPI").withType(MetricType.COUNTER). - withRecievedMessage(0) - .withPublishedMessage(0) - .build(); - if(metricRegistry.register(dmaapKpiMetric)) { - Metric[] metrics = new Metric[]{dmaapKpiMetric}; - LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics); - LogPublisher[] logPublishers = new LogPublisher[1]; - logPublishers[0] = logPublisher; - PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory(). - scheduledPolicyBuilder().withPublishers(logPublishers). - withMetrics(metrics). - build(); - LOG.debug("Policy getting initialized"); - manuallyScheduledPublishingPolicy.init(); - LOG.debug("Metric initialized"); - } - - } - private Set<MRBatchingPublisher> getClients() { - Set<MRBatchingPublisher> out = new HashSet<MRBatchingPublisher>(); - for (String topic : topics) { - try { - String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props); - final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher (topicProducerPropFileName); - out.add(client); - } catch (Exception e) { - e.printStackTrace(); - } - } - - return out; - } - - @Override - public synchronized void updateCredentials(String key, String secret) { - LOG.info(String.format("Setting auth to %s for %s", key, this.toString())); - String user = key; - String password = secret; - props.setProperty("user",String.valueOf(user)); - props.setProperty("password",String.valueOf(password)); - clients = null; - } - - @Override - public boolean post(String partition, String data) { - boolean success = true; - Properties properties=configuration.getProperties(); - if(properties!=null && properties.getProperty("metric.enabled")!=null ){ - isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled")); - } - if(isMetricEnabled){ - initMetric(); - } - - // Create clients once and reuse them on subsequent posts. This is - // to support failover to other servers in the Dmaap cluster. - if ((clients == null) || (clients.isEmpty())) { - LOG.info("Getting CambriaBatchingPublisher Clients ..."); - clients = getClients(); - } - - for (MRBatchingPublisher client : clients) { - try { - LOG.debug(String.format("Posting %s to %s", data, client)); - client.send(partition, data); - } catch (IOException e) { - e.printStackTrace(); - success = false; - } - } - if(isMetricEnabled){ - ( (DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage(); - } - return success; - } - - /** - * Close producer Dmaap client - */ - @Override - public void close() { - if ((clients == null) || (clients.isEmpty())) { - return; - } - - LOG.debug("Closing Dmaap producer clients...."); - for (MRBatchingPublisher client : clients) { - try { - client.close(1, TimeUnit.SECONDS); - } catch (IOException | InterruptedException e) { - LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client)); - e.printStackTrace(); - } - } - } - - @Override - public void useHttps(boolean yes) { - useHttps = yes; - } - - private MetricService getMetricservice() { -/* - return AppcDmaapAdapterActivator.getMetricService(); -*/ - - BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext(); - ServiceReference sref = bctx.getServiceReference(MetricService.class.getName()); - if (sref != null) { - LOG.info("Metric Service from bundlecontext"); - return (MetricService) bctx.getService(sref); - - } else { - LOG.info("Metric Service error from bundlecontext"); - LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService"); - return null; - - } - } - - public Metric getMetric(String name){ - return metricRegistry.metric(name); - } -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java deleted file mode 100644 index da1b0fa38..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/DmaapUtil.java +++ /dev/null @@ -1,86 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.impl; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Properties; - -public class DmaapUtil { - private final static String delimiter = "_"; - private static String createPreferredRouteFileIfNotExist(String topic) throws IOException { - String topicPreferredRouteFileName = null; - topicPreferredRouteFileName = topic+"preferredRoute.properties"; - File fo= new File(topicPreferredRouteFileName); - if(!fo.exists()) { - ClassLoader classLoader = DmaapUtil.class.getClassLoader(); - InputStream inputStream = classLoader.getResourceAsStream("preferredRoute.txt"); - Properties props = new Properties(); - props.load(inputStream); - String fileName = topic != null ? topic+delimiter+"MR1" : delimiter+"MR1"; - props.setProperty("preferredRouteKey", fileName); - topicPreferredRouteFileName = topic + "preferredRoute.properties"; - props.store(new FileOutputStream(topicPreferredRouteFileName), "preferredRoute.properties file created on the fly for topic:" + topic + " on:" + System.currentTimeMillis()); - } - return topicPreferredRouteFileName; - } - - public static String createConsumerPropFile(String topic, Properties props)throws IOException { - String defaultProfFileName = "consumer.properties"; - String topicConsumerPropFileName = createConsumerProducerPropFile(topic, defaultProfFileName,props); - return topicConsumerPropFileName; - } - - public static String createProducerPropFile(String topic, Properties props)throws IOException { - String defaultProfFileName = "producer.properties"; - String topicConsumerPropFileName = createConsumerProducerPropFile(topic, defaultProfFileName,props); - return topicConsumerPropFileName; - } - - private static String createConsumerProducerPropFile(String topic, String defaultProfFileName, Properties props) throws IOException { - ClassLoader classLoader = DmaapUtil.class.getClassLoader(); - InputStream inputStream = classLoader.getResourceAsStream(defaultProfFileName); - Properties defaultProps = new Properties(); - defaultProps.load(inputStream); - defaultProps.setProperty("topic",topic); - - String preferredRouteFileName = DmaapUtil.createPreferredRouteFileIfNotExist(topic); - if(props != null && !props.isEmpty()){ - defaultProps.putAll(props); - } - defaultProps.setProperty("topic",topic); - defaultProps.setProperty("DME2preferredRouterFilePath",preferredRouteFileName); - String id = defaultProps.getProperty("id"); - String topicConsumerPropFileName = defaultProfFileName; - topicConsumerPropFileName = id != null ? id+delimiter+topicConsumerPropFileName : delimiter+topicConsumerPropFileName; - topicConsumerPropFileName = topic != null ? topic+delimiter+topicConsumerPropFileName : delimiter+topicConsumerPropFileName; - - defaultProps.store(new FileOutputStream(topicConsumerPropFileName), defaultProfFileName+" file created on the fly for topic:"+topic+" on:"+System.currentTimeMillis()); - return topicConsumerPropFileName; - } - -} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java deleted file mode 100644 index bf7649026..000000000 --- a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/messaging/dmaap/impl/EventSenderDmaapImpl.java +++ /dev/null @@ -1,178 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.adapter.messaging.dmaap.impl; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import org.onap.ccsdk.sli.core.sli.SvcLogicContext; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -import org.onap.appc.adapter.message.EventSender; -import org.onap.appc.adapter.message.MessageDestination; -import org.onap.appc.adapter.message.Producer; -import org.onap.appc.adapter.message.event.EventHeader; -import org.onap.appc.adapter.message.event.EventMessage; -import org.onap.appc.adapter.message.event.EventStatus; -import org.onap.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl; -import org.onap.appc.configuration.Configuration; -import org.onap.appc.configuration.ConfigurationFactory; -import org.onap.appc.exceptions.APPCException; - -public class EventSenderDmaapImpl implements EventSender -{ - private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderDmaapImpl.class); - public static final String EVENT_TOPIC_WRITE = "dmaap.event.topic.write"; - public static final String DMAAP_USERNAME = "dmaap.appc.username"; - public static final String DMAAP_PASSWORD = "dmaap.appc.password"; - public static final String EVENT_POOL_MEMBERS = "dmaap.event.pool.members"; - - private static Configuration configuration = ConfigurationFactory.getConfiguration(); - - private Map<String,Producer> producerMap = new ConcurrentHashMap<>(); - - public Map<String, Producer> getProducerMap() { - return producerMap; - } - - public void setProducerMap(Map<String, Producer> producerMap) { - this.producerMap = producerMap; - } - - public EventSenderDmaapImpl(){ - - } - - public void initialize(){ - Properties properties = configuration.getProperties(); - String writeTopic; - String username; - String password; - final List<String> pool = new ArrayList<>(); - - for(MessageDestination destination: MessageDestination.values()){ - writeTopic = properties.getProperty(destination + "." + EVENT_TOPIC_WRITE); - username = properties.getProperty(destination + "." + DMAAP_USERNAME); - password = properties.getProperty(destination + "." + DMAAP_PASSWORD); - String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS); - - if (hostNames != null && !hostNames.isEmpty()) { - LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS)); - Collections.addAll(pool, hostNames.split(",")); - } - - LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS)); - LOG.debug(String.format("writeTopic = %s, taken from property: %s", writeTopic, destination + "." + EVENT_TOPIC_WRITE)); - LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME)); - Producer producer = new DmaapProducerImpl(pool, writeTopic,username, password); - - for (String url : pool) { - if (url.contains("3905") || url.contains("https")) { - LOG.debug("Producer should use HTTPS"); - producer.useHttps(true); - break; - } - } - producerMap.put(destination.toString(),producer); - } - - } - - @Override - public boolean sendEvent(MessageDestination destination, EventMessage msg) { - String jsonStr = msg.toJson(); - String id = msg.getEventHeader().getEventId(); - LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr)); - Producer producer = producerMap.get(destination.toString()); - return producer.post(id, jsonStr); - } - - @Override - public boolean sendEvent(MessageDestination destination, EventMessage msg, String eventTopicName) { - String jsonStr = msg.toJson(); - String id = msg.getEventHeader().getEventId(); - LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr)); - Producer producer = createProducer(destination, eventTopicName); - return producer.post(id, jsonStr); - } - - private Producer createProducer(MessageDestination destination, String eventTopicName) { - Properties properties = configuration.getProperties(); - final List<String> pool = new ArrayList<>(); - String username = properties.getProperty(destination + "." + DMAAP_USERNAME); - String password = properties.getProperty(destination + "." + DMAAP_PASSWORD); - String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS); - - if (hostNames != null && !hostNames.isEmpty()) { - LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS)); - Collections.addAll(pool, hostNames.split(",")); - } - - LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS)); - LOG.debug(String.format("writeTopic = %s, taken from property: %s", eventTopicName, destination + "." + EVENT_TOPIC_WRITE)); - LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME)); - Producer producer = new DmaapProducerImpl(pool, eventTopicName,username, password); - - for (String url : pool) { - if (url.contains("3905") || url.contains("https")) { - LOG.debug("Producer should use HTTPS"); - producer.useHttps(true); - break; - } - } - return producer; - } - - @Override - public boolean sendEvent(MessageDestination destination, Map<String, String> params, SvcLogicContext ctx) throws APPCException { - - if (params == null) { - String message = "Parameters map is empty (null)"; - LOG.error(message); - throw new APPCException(message); - } - String eventTime = new Date(System.currentTimeMillis()).toString(); - String apiVer = params.get("apiVer"); - String eventId = params.get("eventId"); - String reason = params.get("reason"); - String entityId=params.get("entityId"); - if(entityId!=null){ - reason=reason+"("+entityId+")"; - } - Integer code = Integer.getInteger(params.get("code"), 500); - - if (eventTime == null || apiVer == null || eventId == null || reason == null) { - String message = String.format("Missing input parameters: %s", params); - LOG.error(message); - throw new APPCException(message); - } - EventMessage eventMessage = new EventMessage( - new EventHeader(eventTime, apiVer, eventId), - new EventStatus(code, reason)); - - return sendEvent(destination,eventMessage); - } -} |