diff options
Diffstat (limited to 'appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java')
-rw-r--r-- | appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java | 326 |
1 files changed, 326 insertions, 0 deletions
diff --git a/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java b/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java new file mode 100644 index 000000000..590afbe8b --- /dev/null +++ b/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/impl/EventHandlerImpl.java @@ -0,0 +1,326 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : APP-C + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.listener.impl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.openecomp.appc.adapter.dmaap.Consumer; +import org.openecomp.appc.adapter.dmaap.DmaapConsumer; +import org.openecomp.appc.adapter.dmaap.DmaapProducer; +import org.openecomp.appc.adapter.dmaap.Producer; +import org.openecomp.appc.adapter.dmaap.DmaapConsumer; +import org.openecomp.appc.adapter.dmaap.DmaapProducer; +import org.openecomp.appc.listener.EventHandler; +import org.openecomp.appc.listener.ListenerProperties; +import org.openecomp.appc.listener.ListenerProperties.MessageService; +import org.openecomp.appc.listener.util.Mapper; +import org.openecomp.appc.logging.LoggingConstants; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.slf4j.MDC; + +/** + * This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure that only well formed + * messages are sent and received on DMaaP. + * + */ +public class EventHandlerImpl implements EventHandler { + + private final EELFLogger LOG = EELFManager.getInstance().getLogger(EventHandlerImpl.class); + + /* + * The amount of time in seconds to keep a connection to a topic open while waiting for data + */ + private int READ_TIMEOUT = 60; + + /* + * The pool of hosts to query against + */ + private Collection<String> pool; + + /* + * The topic to read messages from + */ + private String readTopic; + + /* + * The topic to write messages to + */ + private Set<String> writeTopics; + + /* + * The client (group) name to use for reading messages + */ + private String clientName; + + /* + * The id of the client (group) that is reading messages + */ + private String clientId; + + /* + * The api public key to use for authentication + */ + private String apiKey; + + /* + * The api secret key to use for authentication + */ + private String apiSecret; + + /* + * A json object containing filter arguments. + */ + private String filter_json; + + private MessageService messageService; + + private Consumer reader = null; + private Producer producer = null; + + public EventHandlerImpl(ListenerProperties props) { + pool = new HashSet<String>(); + writeTopics = new HashSet<String>(); + + if (props != null) { + readTopic = props.getProperty(ListenerProperties.KEYS.TOPIC_READ); + clientName = props.getProperty(ListenerProperties.KEYS.CLIENT_NAME, "APP-C"); + clientId = props.getProperty(ListenerProperties.KEYS.CLIENT_ID, "0"); + apiKey = props.getProperty(ListenerProperties.KEYS.AUTH_USER_KEY); + apiSecret = props.getProperty(ListenerProperties.KEYS.AUTH_SECRET_KEY); + + filter_json = props.getProperty(ListenerProperties.KEYS.TOPIC_READ_FILTER); + + READ_TIMEOUT = Integer + .valueOf(props.getProperty(ListenerProperties.KEYS.TOPIC_READ_TIMEOUT, String.valueOf(READ_TIMEOUT))); + + String hostnames = props.getProperty(ListenerProperties.KEYS.HOSTS); + if (hostnames != null && !hostnames.isEmpty()) { + for (String name : hostnames.split(",")) { + pool.add(name); + } + } + + String writeTopicStr = props.getProperty(ListenerProperties.KEYS.TOPIC_WRITE); + if (writeTopicStr != null) { + for (String topic : writeTopicStr.split(",")) { + writeTopics.add(topic); + } + } + + messageService = MessageService.parse(props.getProperty(ListenerProperties.KEYS.MESSAGE_SERVICE)); + + LOG.info(String.format( + "Configured to use %s client on host pool [%s]. Reading from [%s] filtered by %s. Wriring to [%s]. Authenticated using %s", + messageService, hostnames, readTopic, filter_json, writeTopics, apiKey)); + } + } + + @Override + public List<String> getIncomingEvents() { + return getIncomingEvents(1000); + } + + @Override + public List<String> getIncomingEvents(int limit) { + List<String> out = new ArrayList<String>(); + LOG.info(String.format("Getting up to %d incoming events", limit)); + // reuse the consumer object instead of creating a new one every time + if (reader == null) { + LOG.info("Getting Consumer..."); + reader = getConsumer(); + } + for (String item : reader.fetch(READ_TIMEOUT * 1000, limit)) { + out.add(item); + } + LOG.info(String.format("Read %d messages from %s as %s/%s.", out.size(), readTopic, clientName, clientId)); + return out; + } + + @Override + public <T> List<T> getIncomingEvents(Class<T> cls) { + return getIncomingEvents(cls, 1000); + } + + @Override + public <T> List<T> getIncomingEvents(Class<T> cls, int limit) { + List<String> incomingStrings = getIncomingEvents(limit); + return Mapper.mapList(incomingStrings, cls); + } + + @Override + public void postStatus(String event) { + postStatus(null, event); + } + + @Override + public void postStatus(String partition, String event) { + LOG.debug(String.format("Posting Message [%s]", event)); + if (producer == null) { + LOG.info("Getting Producer..."); + producer = getProducer(); + } + producer.post(partition, event); + } + + /** + * Returns a consumer object for direct access to our Cambria consumer interface + * + * @return An instance of the consumer interface + */ + protected Consumer getConsumer() { + LOG.debug(String.format("Getting Consumer: %s %s/%s/%s", pool, readTopic, clientName, clientId)); + if (filter_json == null && writeTopics.contains(readTopic)) { + LOG.error( + "*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****"); + } + Consumer out; + out = new DmaapConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret); + for (String url : pool) { + if (url.contains("3905") || url.contains("https")) { + out.useHttps(true); + break; + } + } + return out; + } + + /** + * Returns a consumer object for direct access to our Cambria producer interface + * + * @return An instance of the producer interface + */ + protected Producer getProducer() { + LOG.debug(String.format("Getting Producer: %s %s", pool, readTopic)); + + Producer out; + out = new DmaapProducer(pool,writeTopics); + + if (apiKey != null && apiSecret != null) { + out.updateCredentials(apiKey, apiSecret); + } + + for (String url : pool) { + if (url.contains("3905") || url.contains("https")) { + out.useHttps(true); + break; + } + } + return out; + } + + @Override + public void closeClients() { + LOG.debug("Closing Consumer and Producer DMaaP clients"); + switch (messageService) { + case DMaaP: + if (reader != null) { + ((DmaapConsumer) reader).close(); + } + if (producer != null) { + ((DmaapProducer) producer).close(); + } + break; + default: + // close DMaaP clients + if (reader != null) { + ((DmaapConsumer) reader).close(); + } + if (producer != null) { + ((DmaapProducer) producer).close(); + } + } + } + + @Override + public String getClientId() { + return clientId; + } + + @Override + public void setClientId(String clientId) { + this.clientId = clientId; + } + + @Override + public String getClientName() { + return clientName; + } + + @Override + public void setClientName(String clientName) { + this.clientName = clientName; + MDC.put(LoggingConstants.MDCKeys.PARTNER_NAME, clientName); + } + + @Override + public void addToPool(String hostname) { + pool.add(hostname); + } + + @Override + public Collection<String> getPool() { + return pool; + } + + @Override + public void removeFromPool(String hostname) { + pool.remove(hostname); + } + + @Override + public String getReadTopic() { + return readTopic; + } + + @Override + public void setReadTopic(String readTopic) { + this.readTopic = readTopic; + } + + @Override + public Set<String> getWriteTopics() { + return writeTopics; + } + + @Override + public void setWriteTopics(Set<String> writeTopics) { + this.writeTopics = writeTopics; + } + + @Override + public void clearCredentials() { + apiKey = null; + apiSecret = null; + } + + @Override + public void setCredentials(String key, String secret) { + apiKey = key; + apiSecret = secret; + } +} |