diff options
author | Patrick Brady <pb071s@att.com> | 2017-02-15 23:11:26 -0800 |
---|---|---|
committer | Patrick Brady <pb071s@att.com> | 2017-02-15 23:13:06 -0800 |
commit | 1c192d2dd68724e292b6a30f463085a262e1e813 (patch) | |
tree | d0e2b3a396e169863cd0efaa835c8675e9d5aaac /appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java | |
parent | c69ba05c7508aa7d7f675189a45c8c87569369ef (diff) |
Moving all files to root directory
Change-Id: Ica5535fd6ec85f350fe1640b42137b49f83f10f0
Signed-off-by: Patrick Brady <pb071s@att.com>
Diffstat (limited to 'appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java')
14 files changed, 1154 insertions, 0 deletions
diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/AppcDmaapAdapterActivator.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/AppcDmaapAdapterActivator.java new file mode 100644 index 000000000..98b8aa158 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/AppcDmaapAdapterActivator.java @@ -0,0 +1,113 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.dmaap; + +import org.openecomp.appc.configuration.ConfigurationFactory; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceRegistration; + +import java.util.Properties; + +/** + * This activator is used to initialize and terminate the connection pool to one or more providers. + * <p> + * The CDP abstraction layer supports multiple types of providers, with each provider supporting multiple tenants. The + * "connection" to a specific tenant on a specific provider is represented by a "context" object. These context objects + * are authenticated to a specific tenant on the provider, but can be reused from one request to another. Contexts are + * slow to set up and are resource intensive, so they are cached. However, the contexts for a specific tenant on a + * specific provider must be cached separately. + * </p> + * <p> + * Activation of the bundle creates an empty cache which is organized first by provider type, then by tenant name, with + * the contents being an empty pool of contexts for that provider/tenant combination. The pool is created on first use, + * and retained for as long as the bundle is active. + * </p> + * <p> + * When the bundle is deactivated, the cache is torn down with all contexts being closed. + * </p> + */ +public class AppcDmaapAdapterActivator implements BundleActivator { + private ServiceRegistration registration = null; + + /** + * The logger to be used + */ + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(AppcDmaapAdapterActivator.class); + + /** + * Called when this bundle is started so the Framework can perform the bundle-specific activities necessary to start + * this bundle. This method can be used to register services or to allocate any resources that this bundle needs. + * <p> + * This method must complete and return to its caller in a timely manner. + * </p> + * + * @param bundleContext + * The execution context of the bundle being started. + * @throws java.lang.Exception + * If this method throws an exception, this bundle is marked as stopped and the Framework will remove + * this bundle's listeners, unregister all services registered by this bundle, and release all services + * used by this bundle. + * @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext) + */ + @Override + public void start(final BundleContext bundleContext) throws Exception { + LOG.info("Starting Bundle " + getName()); + + /* if (registration == null) { + Properties properties = ConfigurationFactory.getConfiguration().getProperties(); + registration = bundleContext.registerService(EventSender.class, new EventSenderImpl(properties), null); + }*/ + } + + /** + * Called when this bundle is stopped so the Framework can perform the bundle-specific activities necessary to stop + * the bundle. In general, this method should undo the work that the BundleActivator.start method started. There + * should be no active threads that were started by this bundle when this bundle returns. A stopped bundle must not + * call any Framework objects. + * <p> + * This method must complete and return to its caller in a timely manner. + * </p> + * + * @param ctx + * The execution context of the bundle being stopped. + * @throws java.lang.Exception + * If this method throws an exception, the bundle is still marked as stopped, and the Framework will + * remove the bundle's listeners, unregister all services registered by the bundle, and release all + * services used by the bundle. * + * @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext) + */ + @Override + public void stop(BundleContext ctx) throws Exception { + /*if (this.registration != null) { + this.registration.unregister(); + }*/ + LOG.info("Stopped Bundle " + getName()); + } + + public String getName() { + return "DMaaP Adapter"; + } + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java new file mode 100644 index 000000000..7c282911d --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CallableConsumer.java @@ -0,0 +1,58 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.dmaap; + +import java.util.List; +import java.util.concurrent.Callable; + +public class CallableConsumer implements Callable<List<String>> { + + private Consumer consumer; + + private int timeout = 15000; + private int limit = 1000; + + public CallableConsumer(Consumer c) { + this.consumer = c; + } + + public CallableConsumer(Consumer c, int waitMs, int fetchSize) { + this.consumer = c; + this.timeout = waitMs; + this.limit = fetchSize; + } + + @Override + public List<String> call() { + return consumer.fetch(timeout, limit); + } + + /** + * The maximum amount of time to keep a connection alive. Currently is set to waitMs + 10s + * + * @return An integer representing the maximum amount of time to keep this thread alive + */ + public int getMaxLife() { + return 10000 + timeout; + } + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CommonHttpClient.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CommonHttpClient.java new file mode 100644 index 000000000..654ec6f7f --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/CommonHttpClient.java @@ -0,0 +1,105 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.dmaap; + +import java.net.URI; + +import org.apache.commons.codec.binary.Base64; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.config.RequestConfig.Builder; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; + +public class CommonHttpClient { + + public static final int HTTPS_PORT = 3905; + + private String AUTH_STR; + + protected void setBasicAuth(String username, String password) { + if (username != null && password != null) { + String plain = String.format("%s:%s", username, password); + AUTH_STR = Base64.encodeBase64String(plain.getBytes()); + } else { + AUTH_STR = null; + } + } + + public HttpGet getReq(URI uri, int timeoutMs) throws Exception { + HttpGet out = (uri == null) ? new HttpGet() : new HttpGet(uri); + if (AUTH_STR != null) { + out.setHeader("Authorization", String.format("Basic %s", AUTH_STR)); + } + out.setConfig(getConfig(timeoutMs)); + return out; + } + + public HttpPost postReq(String url) throws Exception { + HttpPost out = (url == null) ? new HttpPost() : new HttpPost(url); + if (AUTH_STR != null) { + out.setHeader("Authorization", String.format("Basic %s", AUTH_STR)); + } + out.setConfig(getConfig(0)); + return out; + } + + private RequestConfig getConfig(int timeoutMs) { + Builder builder = RequestConfig.custom(); + builder.setSocketTimeout(timeoutMs + 5000); + return builder.build(); + } + + public CloseableHttpClient getClient() { + return getClient(false); + } + + public CloseableHttpClient getClient(boolean useHttps) { + return HttpClientBuilder.create().build(); + } + + public String formatHostString(String host) { + return formatHostString(host, host.contains(String.valueOf(HTTPS_PORT))); + } + + public String formatHostString(String host, boolean useHttps) { + // Trim trailing slash + String out = host.endsWith("/") ? host.substring(0, host.length() - 1) : host; + + boolean hasProto = out.startsWith("http"); + boolean hasPort = out.contains(":"); + + // Add protocol + if (!hasProto) { + out = String.format("%s%s", (useHttps) ? "https://" : "http://", out); + } + + // Add port + if (!hasPort) { + out = String.format("%s:%d", out, (useHttps) ? 3905 : 3904); + } + + return out; + + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java new file mode 100644 index 000000000..32034e5fb --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Consumer.java @@ -0,0 +1,68 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.dmaap; + +import java.util.List; + +public interface Consumer { + + /** + * Gets a batch of messages from the topic. Defaults to 1000 messages with 15s wait for messages if empty. + * + * @return A list of strings representing the messages pulled from the topic. + */ + public List<String> fetch(); + + /** + * Gets a batch of messages from the topic. + * + * @param waitMs + * The amount of time to wait in milliseconds if the topic is empty for data to be written. Should be no + * less than 15000ms to prevent too many requests + * @param limit + * The amount of messages to fetch + * @return A list of strings representing the messages pulled from the topic. + */ + public List<String> fetch(int waitMs, int limit); + + /** + * Updates the api credentials for making authenticated requests + * + * @param apiKey + * The public key to authenticate with + * @param apiSecret + * The secret key to authenticate with + */ + public void updateCredentials(String apiKey, String apiSecret); + + // TODO - Implement once Cambria allows you to set outside of constructor + // public void setFilter(String filter); + + /** + * Creates a dmaap client using a https connection + * + * @param yes + * True if https should be used, false otherwise + */ + public void useHttps(boolean yes); + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapConsumer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapConsumer.java new file mode 100644 index 000000000..6e16d896b --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapConsumer.java @@ -0,0 +1,163 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.dmaap; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import org.apache.http.HttpEntity; +import org.apache.http.NameValuePair; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.util.EntityUtils; +import org.json.JSONArray; + +public class DmaapConsumer extends CommonHttpClient implements Consumer { + + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumer.class); + + // Default values + private static final int DEFAULT_TIMEOUT_MS = 15000; + private static final int DEFAULT_LIMIT = 1000; + private static final String HTTPS_PORT = ":3905"; + private static final String URL_TEMPLATE = "%s/events/%s/%s/%s"; + + private List<String> urls; + private String filter; + + private boolean useHttps = false; + + public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId) { + this(hosts, topicName, consumerName, consumerId, null); + } + + public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId, + String filter) { + this(hosts, topicName, consumerName, consumerId, filter, null, null); + } + + public DmaapConsumer(Collection<String> hosts, String topicName, String consumerName, String consumerId, + String filter, String user, String password) { + urls = new ArrayList<String>(); + for (String host : hosts) { + urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId)); + } + this.filter = filter; + updateCredentials(user, password); + } + + @Override + public void updateCredentials(String user, String pass) { + LOG.debug(String.format("Setting auth to %s for %s", user, this.toString())); + this.setBasicAuth(user, pass); + } + + @Override + public List<String> fetch(int waitMs, int limit) { + LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString())); + List<String> out = new ArrayList<String>(); + try { + List<NameValuePair> urlParams = new ArrayList<NameValuePair>(); + urlParams.add(new BasicNameValuePair("timeout", String.valueOf(waitMs))); + urlParams.add(new BasicNameValuePair("limit", String.valueOf(limit))); + if (filter != null) { + urlParams.add(new BasicNameValuePair("filter", filter)); + } + + URIBuilder builder = new URIBuilder(urls.get(0)); + builder.setParameters(urlParams); + + URI uri = builder.build(); + LOG.info(String.format("GET %s", uri)); + HttpGet request = getReq(uri, waitMs); + CloseableHttpResponse response = getClient().execute(request); + + int httpStatus = response.getStatusLine().getStatusCode(); + HttpEntity entity = response.getEntity(); + String body = (entity != null) ? EntityUtils.toString(entity) : null; + + LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus, + (body != null ? body.length() : "null"))); + + response.close(); + if (httpStatus == 200 && body != null) { + JSONArray json = new JSONArray(body); + LOG.info(String.format("Got %d messages from DMaaP", json.length())); + for (int i = 0; i < json.length(); i++) { + out.add(json.getString(i)); + } + } else { + LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body)); + sleep(waitMs); + } + } catch (Exception e) { + if (urls.size() > 1) { + String failedUrl = urls.remove(0); + urls.add(failedUrl); + LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl, + urls.get(0))); + } + LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e); + sleep(waitMs); + } + + return out; + } + + @Override + public List<String> fetch() { + return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT); + } + + @Override + public String toString() { + String hostStr = (urls == null && !urls.isEmpty()) ? "N/A" : urls.get(0); + return String.format("Consumer listening to [%s]", hostStr); + } + + @Override + public void useHttps(boolean yes) { + useHttps = yes; + } + + private void sleep(int ms) { + LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000)); + try { + Thread.sleep(ms); + } catch (InterruptedException e1) { + LOG.error("Interrupted while sleeping"); + } + } + + public void close(){ + //not used yet + } + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java new file mode 100644 index 000000000..efbe194ba --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapDestination.java @@ -0,0 +1,26 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.dmaap; + +public enum DmaapDestination { + DCAE +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapProducer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapProducer.java new file mode 100644 index 000000000..6845177b1 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/DmaapProducer.java @@ -0,0 +1,133 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.dmaap; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; + +public class DmaapProducer extends CommonHttpClient implements Producer { + + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducer.class); + + private static final String CONTENT_TYPE = "application/cambria"; + private static final String URL_TEMPLATE = "%s/events/%s"; + + private List<String> hosts; + private Set<String> topics; + + private boolean useHttps = false; + + public DmaapProducer(Collection<String> urls, String topicName) { + hosts = new ArrayList<String>(); + topics = new HashSet<String>(); + topics.add(topicName); + + for (String host : urls) { + hosts.add(formatHostString(host)); + } + } + + public DmaapProducer(Collection<String> urls, Set<String> topicNames) { + hosts = new ArrayList<String>(); + topics = topicNames; + + for (String host : urls) { + hosts.add(formatHostString(host)); + } + } + + @Override + public void updateCredentials(String user, String pass) { + LOG.debug(String.format("Setting auth to %s for %s", user, this.toString())); + this.setBasicAuth(user, pass); + } + + @Override + public boolean post(String partition, String data) { + int sent = 0; + try { + HttpPost request = postReq(null); + request.setHeader("Content-Type", CONTENT_TYPE); + request.setEntity(new StringEntity(bodyLine(partition, data))); + + for (String topic : topics) { + String uriStr = String.format(URL_TEMPLATE, hosts.get(0), topic); + try { + request.setURI(new URI(uriStr)); + CloseableHttpResponse response = getClient().execute(request); + if (response.getStatusLine().getStatusCode() == 200) { + sent++; + } + response.close(); + } catch (Exception sendEx) { + LOG.error(String.format("Failed to send message to %s. Reason: %s", uriStr, sendEx.getMessage()), + sendEx); + if (hosts.size() > 1) { + String failedUrl = hosts.remove(0); + hosts.add(failedUrl); + LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", + failedUrl, hosts.get(0))); + } + } + } + } catch (Exception buildEx) { + LOG.error( + String.format("Failed to build request with string [%s]. Message not sent to any topic. Reason: %s", + data, buildEx.getMessage()), + buildEx); + } + return sent == topics.size(); + } + + @Override + public void useHttps(boolean yes) { + useHttps = yes; + } + + /** + * Format the body for the application/cambria content type with no partitioning. + * + * @param msg + * The message body to format + * @return A string in the application/cambria content type + */ + private String bodyLine(String partition, String msg) { + String p = (partition == null) ? "" : partition; + String m = (msg == null) ? "" : msg; + return String.format("%d.%d.%s%s", p.length(), m.length(), p, m); + } + + public void close(){ + //not used yet + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java new file mode 100644 index 000000000..7d4a7c090 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/EventSender.java @@ -0,0 +1,35 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.dmaap; + +import java.util.Map; + +import org.openecomp.appc.adapter.dmaap.event.EventMessage; +import org.openecomp.appc.exceptions.APPCException; +import org.openecomp.sdnc.sli.SvcLogicContext; +import org.openecomp.sdnc.sli.SvcLogicJavaPlugin; + + +public interface EventSender extends SvcLogicJavaPlugin{ + boolean sendEvent(DmaapDestination destination, EventMessage msg); + boolean sendEvent(DmaapDestination destination, Map<String, String> params, SvcLogicContext ctx) throws APPCException; +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java new file mode 100644 index 000000000..183e618ba --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Manager.java @@ -0,0 +1,45 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.dmaap; + +import java.util.Set; + +public interface Manager { + + /** + * Updates the api credentials for making authenticated requests + * + * @param apiKey + * The public key to authenticate with + * @param apiSecret + * The secret key to authenticate with + */ + public void updateCredentials(String apiKey, String apiSecret); + + /** + * Return a set of strings representing topics that the user can see + * + * @return A set of strings with topic names or an empty set if no topics are visible + */ + public Set<String> getTopics(); + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java new file mode 100644 index 000000000..f19c516be --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/Producer.java @@ -0,0 +1,46 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.dmaap; + +public interface Producer { + + public boolean post(String partition, String data); + + /** + * Updates the api credentials for making authenticated requests + * + * @param apiKey + * The public key to authenticate with + * @param apiSecret + * The secret key to authenticate with + */ + public void updateCredentials(String apiKey, String apiSecret); + + /** + * Creates a dmaap client using a https connection + * + * @param yes + * True if https should be used, false otherwise + */ + public void useHttps(boolean yes); + +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java new file mode 100644 index 000000000..dd951fe37 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventHeader.java @@ -0,0 +1,64 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.dmaap.event; + +import com.fasterxml.jackson.annotation.JsonProperty; + + +public class EventHeader { + + @JsonProperty("eventTime") + private final String eventTime; + + @JsonProperty("apiVer") + private final String apiVer; + + @JsonProperty("eventId") + private final String eventId; + + public EventHeader(String eventTime, String apiVer, String eventId) { + this.eventTime = eventTime; + this.apiVer = apiVer; + this.eventId = eventId; + } + + public String getEventTime() { + return eventTime; + } + + public String getApiVer() { + return apiVer; + } + + public String getEventId() { + return eventId; + } + + @Override + public String toString() { + return "EventHeader{" + + "eventTime='" + eventTime + '\'' + + ", apiVer='" + apiVer + '\'' + + ", eventId='" + eventId + '\'' + + '}'; + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java new file mode 100644 index 000000000..af5cff2f9 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventMessage.java @@ -0,0 +1,99 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.dmaap.event; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize.Inclusion; + +import java.io.IOException; +import java.io.Serializable; + +/* + { + "EventHeader": { + "eventTime": "2016-03-15T10:59:33.79Z", + "apiVer": "1.01", + "EventId": "<ECOMP_EVENT_ID>", + }, + "EventStatus": { + "code": "NNN", + "reason": "A reason" + } + } +*/ + + + + +@JsonSerialize(include = Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class EventMessage implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @JsonProperty("eventHeader") + private EventHeader eventHeader; + @JsonProperty("eventStatus") + private EventStatus eventStatus; + + public EventMessage(EventHeader eventHeader, EventStatus eventStatus) { + this.eventHeader = eventHeader; + this.eventStatus = eventStatus; + } + + public EventHeader getEventHeader() { + return eventHeader; + } + + public void setEventHeader(EventHeader eventHeader) { + this.eventHeader = eventHeader; + } + + public EventStatus getEventStatus() { + return eventStatus; + } + + public void setEventStatus(EventStatus eventStatus) { + this.eventStatus = eventStatus; + } + + public String toJson() { + try { + return OBJECT_MAPPER.writeValueAsString(this); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public String toString() { + return "EventMessage{" + + "eventHeader=" + eventHeader + + ", eventStatus=" + eventStatus + + '}'; + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java new file mode 100644 index 000000000..f5d7a59d4 --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/event/EventStatus.java @@ -0,0 +1,56 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.dmaap.event; + +import com.fasterxml.jackson.annotation.JsonProperty; + + +public class EventStatus { + + @JsonProperty("code") + private final Integer code; + + @JsonProperty("reason") + private final String reason; + + public EventStatus(Integer code, String aReason) { + this.code = code; + reason = aReason; + } + + + public Integer getCode() { + return code; + } + + public String getReason() { + return reason; + } + + @Override + public String toString() { + return "EventStatus{" + + "code=" + code + + ", reason='" + reason + '\'' + + '}'; + } +} diff --git a/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/impl/EventSenderImpl.java b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/impl/EventSenderImpl.java new file mode 100644 index 000000000..0f7d40f5b --- /dev/null +++ b/appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/openecomp/appc/adapter/dmaap/impl/EventSenderImpl.java @@ -0,0 +1,143 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.adapter.dmaap.impl; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import org.openecomp.appc.adapter.dmaap.EventSender; +import org.openecomp.appc.adapter.dmaap.Producer; +import org.openecomp.appc.adapter.dmaap.DmaapDestination; +import org.openecomp.appc.adapter.dmaap.event.EventHeader; +import org.openecomp.appc.adapter.dmaap.event.EventMessage; +import org.openecomp.appc.adapter.dmaap.event.EventStatus; +import org.openecomp.appc.adapter.dmaap.DmaapProducer; +import org.openecomp.appc.configuration.Configuration; +import org.openecomp.appc.configuration.ConfigurationFactory; +import org.openecomp.appc.exceptions.APPCException; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.openecomp.sdnc.sli.SvcLogicContext; + + +public class EventSenderImpl implements EventSender +{ + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderImpl.class); + public static final String EVENT_TOPIC_WRITE = "event.topic.write"; + public static final String EVENT_CLIENT_KEY = "event.client.key"; + public static final String EVENT_CLIENT_SECRET = "event.client.secret"; + public static final String EVENT_POOL_MEMBERS = "event.pool.members"; + + private static Configuration configuration = ConfigurationFactory.getConfiguration(); + + private Map<String,Producer> producerMap = new ConcurrentHashMap<>(); + + public Map<String, Producer> getProducerMap() { + return producerMap; + } + + public void setProducerMap(Map<String, Producer> producerMap) { + this.producerMap = producerMap; + } + + public EventSenderImpl(){ + + } + + public void initialize(){ + Properties properties = configuration.getProperties(); + String writeTopic; + String apiKey; + String apiSecret; + final List<String> pool = new ArrayList<>(); + + for(DmaapDestination destination:DmaapDestination.values()){ + writeTopic = properties.getProperty(destination + "." + EVENT_TOPIC_WRITE); + apiKey = properties.getProperty(destination + "." + EVENT_CLIENT_KEY); + apiSecret = properties.getProperty(destination + "." + EVENT_CLIENT_SECRET); + String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS); + + if (hostNames != null && !hostNames.isEmpty()) { + LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS)); + Collections.addAll(pool, hostNames.split(",")); + } + + LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS)); + LOG.debug(String.format("writeTopic = %s, taken from property: %s", writeTopic, destination + "." + EVENT_TOPIC_WRITE)); + LOG.debug(String.format("apiKey = %s, taken from property: %s", apiKey, destination + "." + EVENT_CLIENT_KEY)); + Producer producer = new DmaapProducer(pool, writeTopic); + + if (apiKey != null && apiSecret != null) { + producer.updateCredentials(apiKey, apiSecret); + } + + for (String url : pool) { + if (url.contains("3905") || url.contains("https")) { + LOG.debug("Producer should use HTTPS"); + producer.useHttps(true); + break; + } + } + producerMap.put(destination.toString(),producer); + } + + } + + @Override + public boolean sendEvent(DmaapDestination destination,EventMessage msg) { + String jsonStr = msg.toJson(); + String id = msg.getEventHeader().getEventId(); + LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr)); + Producer producer = producerMap.get(destination.toString()); + return producer.post(id, jsonStr); + } + + @Override + public boolean sendEvent(DmaapDestination destination,Map<String, String> params, SvcLogicContext ctx) throws APPCException { + + if (params == null) { + String message = "Parameters map is empty (null)"; + LOG.error(message); + throw new APPCException(message); + } + String eventTime = new Date(System.currentTimeMillis()).toString(); + String apiVer = params.get("apiVer"); + String eventId = params.get("eventId"); + String reason = params.get("reason"); + String entityId=params.get("entityId"); + if(entityId!=null){ + reason=reason+"("+entityId+")"; + } + Integer code = Integer.getInteger(params.get("code"), 500); + + if (eventTime == null || apiVer == null || eventId == null || reason == null) { + String message = String.format("Missing input parameters: %s", params); + LOG.error(message); + throw new APPCException(message); + } + EventMessage dmaapEventMessage = new EventMessage( + new EventHeader(eventTime, apiVer, eventId), + new EventStatus(code, reason)); + + return sendEvent(destination,dmaapEventMessage); + } +} |