diff options
Diffstat (limited to 'src/main/java/org/openecomp/event')
4 files changed, 443 insertions, 0 deletions
diff --git a/src/main/java/org/openecomp/event/EventBusComponent.java b/src/main/java/org/openecomp/event/EventBusComponent.java new file mode 100644 index 0000000..6795931 --- /dev/null +++ b/src/main/java/org/openecomp/event/EventBusComponent.java @@ -0,0 +1,52 @@ +/** + * ============LICENSE_START======================================================= + * DataRouter + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. + * Copyright © 2017 Amdocs + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License ati + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP and OpenECOMP are trademarks + * and service marks of AT&T Intellectual Property. + */ +package org.openecomp.event; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.impl.UriEndpointComponent; + +import java.util.Map; + +/** + * Represents the component that manages {@link EventBusEndpoint}. + */ +public class EventBusComponent extends UriEndpointComponent { + + public EventBusComponent() { + super(EventBusEndpoint.class); + } + + public EventBusComponent(CamelContext context) { + super(context, EventBusEndpoint.class); + } + + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) + throws Exception { + Endpoint endpoint = new EventBusEndpoint(uri, this); + setProperties(endpoint, parameters); + return endpoint; + } +} diff --git a/src/main/java/org/openecomp/event/EventBusConsumer.java b/src/main/java/org/openecomp/event/EventBusConsumer.java new file mode 100644 index 0000000..30ac40e --- /dev/null +++ b/src/main/java/org/openecomp/event/EventBusConsumer.java @@ -0,0 +1,181 @@ +/** + * ============LICENSE_START======================================================= + * DataRouter + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. + * Copyright © 2017 Amdocs + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License ati + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP and OpenECOMP are trademarks + * and service marks of AT&T Intellectual Property. + */ +package org.openecomp.event; + +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; +import com.att.nsa.cambria.client.CambriaConsumer; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.impl.ScheduledPollConsumer; +import org.openecomp.cl.api.Logger; +import org.openecomp.cl.eelf.LoggerFactory; +import org.openecomp.cl.mdc.MdcContext; +import org.openecomp.logging.RouterCoreMsgs; + +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +/** + * The consumer component which is used to pull messages off of the event bus and send them to the + * next processor in the route chain. This type of consumer is based off of a scheduled poller so + * that events are pulled on a regular basis. + */ +public class EventBusConsumer extends ScheduledPollConsumer { + + private Logger logger = LoggerFactory.getInstance().getLogger(EventBusConsumer.class); + private Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(EventBusConsumer.class); + private final EventBusEndpoint endpoint; + + private CambriaConsumer consumer; + + /** + * EventBusConsumer Constructor. + */ + public EventBusConsumer(EventBusEndpoint endpoint, Processor processor) { + super(endpoint, processor); + super.setDelay(endpoint.getPollingDelay()); + this.endpoint = endpoint; + + setScheduledExecutorService(new ScheduledThreadPoolExecutor(endpoint.getPoolSize())); + + String[] urls = endpoint.getUrl().split(","); + + List<String> urlList = null; + + if (urls != null) { + urlList = Arrays.asList(urls); + } + + try { + + ConsumerBuilder consumerBuilder = new CambriaClientBuilders.ConsumerBuilder() + .usingHosts(urlList).onTopic(endpoint.getEventTopic()) + .knownAs(endpoint.getGroupName(), endpoint.getGroupId()); + + String apiKey = endpoint.getApiKey(); + String apiSecret = endpoint.getApiSecret(); + + if (apiKey != null && apiSecret != null) { + consumerBuilder.authenticatedBy(endpoint.getApiKey(), endpoint.getApiSecret()); + } + + consumer = consumerBuilder.build(); + + } catch (MalformedURLException | GeneralSecurityException e) { + logger.error(RouterCoreMsgs.EVENT_CONSUMER_CREATION_EXCEPTION, e.getLocalizedMessage()); + } + } + + /** + * Method which is called by the Camel process on a scheduled basis. This specific implementation + * reads messages off of the configured topic and schedules tasks to process them . + * + * @return the number of messages that were processed off the event queue + */ + @Override + protected int poll() throws Exception { + + logger.debug("Checking for event on topic: " + endpoint.getEventTopic()); + + int processCount = 0; + + Iterable<String> messages = null; + + messages = consumer.fetch(); + + String topic = endpoint.getEventTopic(); + + for (String message : messages) { + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setBody(message); + getScheduledExecutorService().submit(new EventProcessor(exchange, topic)); + ++processCount; + } + return processCount; + } + + protected void doStop() throws Exception { + super.doStop(); + if (consumer != null) { + consumer.close(); + } + } + + protected void doShutdown() throws Exception { + super.doShutdown(); + if (consumer != null) { + consumer.close(); + } + } + + /** + * Class responsible for processing messages pulled off of the event bus. + */ + private class EventProcessor implements Runnable { + + private Exchange message; + + private String topic; + + EventProcessor(Exchange message, String topic) { + this.message = message; + this.topic = topic; + } + + public void run() { + try { + + MdcContext.initialize(UUID.randomUUID().toString(), "DataRouter", "", "Event-Bus", ""); + + // Sends the message to the next processor in the defined Camel route + getProcessor().process(message); + + Message response = message.getOut(); + if (response != null) { + logger.debug("Routing response: " + response.getBody()); + } + + } catch (Exception e) { + logger.error(RouterCoreMsgs.EVENT_PROCESSING_EXCEPTION, e.getLocalizedMessage()); + } finally { + // log exception if an exception occurred and was not handled + if (message.getException() != null) { + logger.info(RouterCoreMsgs.PROCESS_EVENT, topic, "FAILURE"); + auditLogger.info(RouterCoreMsgs.PROCESS_EVENT, topic, "FAILURE"); + } else { + logger.info(RouterCoreMsgs.PROCESS_EVENT, topic, "SUCCESS"); + auditLogger.info(RouterCoreMsgs.PROCESS_EVENT, topic, "SUCCESS"); + } + } + } + } +} diff --git a/src/main/java/org/openecomp/event/EventBusEndpoint.java b/src/main/java/org/openecomp/event/EventBusEndpoint.java new file mode 100644 index 0000000..470b974 --- /dev/null +++ b/src/main/java/org/openecomp/event/EventBusEndpoint.java @@ -0,0 +1,165 @@ +/** + * ============LICENSE_START======================================================= + * DataRouter + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. + * Copyright © 2017 Amdocs + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License ati + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP and OpenECOMP are trademarks + * and service marks of AT&T Intellectual Property. + */ +package org.openecomp.event; + + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.eclipse.jetty.util.security.Password; + +/** + * Represents a EventBus endpoint. + */ +@UriEndpoint(scheme = "event-bus", syntax = "event-bus:name", + consumerClass = EventBusConsumer.class, title = "event-bus") +public class EventBusEndpoint extends DefaultEndpoint { + @UriPath + @Metadata(required = "true") + private String name; + + @UriParam(label = "eventTopic") + @Metadata(required = "true") + private String eventTopic; + @UriParam(label = "groupName") + @Metadata(required = "true") + private String groupName; + @UriParam(label = "groupId") + @Metadata(required = "true") + private String groupId; + @UriParam(label = "apiKey") + private String apiKey; + @UriParam(label = "apiSecret") + private String apiSecret; + @UriParam(label = "url") + @Metadata(required = "true") + private String url; + @UriParam(label = "poolSize") + @Metadata(required = "true", defaultValue="20") + private int poolSize = 20; + @UriParam(label = "pollingDelay") + @Metadata(required = "true", defaultValue="30000") + private int pollingDelay = 30000; + + public EventBusEndpoint() {} + + public EventBusEndpoint(String uri, EventBusComponent component) { + super(uri, component); + } + + public EventBusEndpoint(String endpointUri) { + super(endpointUri); + } + + public Producer createProducer() throws Exception { + return new EventBusProducer(this); + } + + public Consumer createConsumer(Processor processor) throws Exception { + return new EventBusConsumer(this, processor); + } + + public boolean isSingleton() { + return false; + } + + public void setName(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public String getEventTopic() { + return eventTopic; + } + + public void setEventTopic(String eventTopic) { + this.eventTopic = eventTopic; + } + + public String getGroupName() { + return groupName; + } + + public void setGroupName(String groupName) { + this.groupName = groupName; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + public String getApiKey() { + return apiKey == null ? null : Password.deobfuscate(apiKey); + } + + public void setApiKey(String apiKey) { + this.apiKey = apiKey; + } + + public String getApiSecret() { + return apiSecret == null ? null : Password.deobfuscate(apiSecret); + } + + public void setApiSecret(String apiSecret) { + this.apiSecret = apiSecret; + } + + public int getPoolSize() { + return poolSize; + } + + public void setPoolSize(int poolsize) { + this.poolSize = poolsize; + } + + public int getPollingDelay() { + return pollingDelay; + } + + public void setPollingDelay(int pollingDelay) { + this.pollingDelay = pollingDelay; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } +} + diff --git a/src/main/java/org/openecomp/event/EventBusProducer.java b/src/main/java/org/openecomp/event/EventBusProducer.java new file mode 100644 index 0000000..ecfebc3 --- /dev/null +++ b/src/main/java/org/openecomp/event/EventBusProducer.java @@ -0,0 +1,45 @@ +/** + * ============LICENSE_START======================================================= + * DataRouter + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. + * Copyright © 2017 Amdocs + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License ati + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP and OpenECOMP are trademarks + * and service marks of AT&T Intellectual Property. + */ +package org.openecomp.event; + +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultProducer; + +/** + * The EventBus producer. + */ +public class EventBusProducer extends DefaultProducer { + private EventBusEndpoint endpoint; + + public EventBusProducer(EventBusEndpoint endpoint) { + super(endpoint); + this.endpoint = endpoint; + } + + public void process(Exchange exchange) throws Exception { + // Publishing to event bus is currently not supported + } + +} |