aboutsummaryrefslogtreecommitdiffstats
path: root/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java
diff options
context:
space:
mode:
Diffstat (limited to 'services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java')
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java294
1 files changed, 294 insertions, 0 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java
new file mode 100644
index 000000000..25f92d843
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java
@@ -0,0 +1,294 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event;
+
+import java.util.AbstractMap.SimpleEntry;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * This class holds a cache of the synchronous events sent into Apex and that have not yet been replied to. It runs a thread to time out events that have not
+ * been replied to in the specified timeout.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class SynchronousEventCache extends PeeredReference implements Runnable {
+ // Get a reference to the logger
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(SynchronousEventCache.class);
+
+ // The default amount of time to wait for a synchronous event to be replied to is 1 second
+ private static final long DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT = 1000;
+
+ // The timeout to wait between event polls in milliseconds and the time to wait for the thread to stop
+ private static final long OUTSTANDING_EVENT_POLL_TIMEOUT = 50;
+ private static final long CACHE_STOP_WAIT_INTERVAL = 10;
+
+ // The time in milliseconds to wait for the reply to a sent synchronous event
+ private long synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT;
+
+ // Map holding outstanding synchronous events
+ private final Map<Long, SimpleEntry<Long, Object>> toApexEventMap = new HashMap<Long, SimpleEntry<Long, Object>>();
+
+ // Map holding reply events
+ private final Map<Long, SimpleEntry<Long, Object>> fromApexEventMap = new HashMap<Long, SimpleEntry<Long, Object>>();
+
+ // The message listener thread and stopping flag
+ private final Thread synchronousEventCacheThread;
+ private boolean stopOrderedFlag = false;
+
+ /**
+ * Create a synchronous event cache that caches outstanding synchronous Apex events.
+ *
+ * @param peeredMode the peered mode for which to return the reference
+ * @param consumer the consumer that is populating the cache
+ * @param producer the producer that is emptying the cache
+ * @param synchronousEventTimeout the time in milliseconds to wait for the reply to a sent synchronous event
+ */
+ public SynchronousEventCache(final EventHandlerPeeredMode peeredMode, final ApexEventConsumer consumer, final ApexEventProducer producer, final long synchronousEventTimeout) {
+ super(peeredMode, consumer, producer);
+
+ if (synchronousEventTimeout != 0) {
+ this.synchronousEventTimeout = synchronousEventTimeout;
+ }
+ else {
+ this.synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT;
+ }
+
+ // Start scanning the outstanding events
+ synchronousEventCacheThread = new Thread(this);
+ synchronousEventCacheThread.setDaemon(true);
+ synchronousEventCacheThread.start();
+ }
+
+ /**
+ * Gets the timeout value for synchronous events.
+ *
+ * @return the synchronous event timeout
+ */
+ public long getSynchronousEventTimeout() {
+ return synchronousEventTimeout;
+ }
+
+ /**
+ * Cache a synchronized event sent into Apex in the event cache.
+ *
+ * @param executionId the execution ID that was assigned to the event
+ * @param event the apex event
+ */
+ public void cacheSynchronizedEventToApex(final long executionId, final Object event) {
+ // Add the event to the map
+ synchronized (toApexEventMap) {
+ cacheSynchronizedEvent(toApexEventMap, executionId, event);
+ }
+ }
+
+ /**
+ * Remove the record of an event sent to Apex if it exists in the cache.
+ *
+ * @param executionId the execution ID of the event
+ * @return The removed event
+ */
+ public Object removeCachedEventToApexIfExists(final long executionId) {
+ synchronized (toApexEventMap) {
+ return removeCachedEventIfExists(toApexEventMap, executionId);
+ }
+ }
+
+ /**
+ * Check if an event exists in the to apex cache.
+ *
+ * @param executionId the execution ID of the event
+ * @return true if the event exists, false otherwise
+ */
+ public boolean existsEventToApex(final long executionId) {
+ synchronized (toApexEventMap) {
+ return toApexEventMap.containsKey(executionId);
+ }
+ }
+
+ /**
+ * Cache synchronized event received from Apex in the event cache.
+ *
+ * @param executionId the execution ID of the event
+ * @param event the apex event
+ */
+ public void cacheSynchronizedEventFromApex(final long executionId, final Object event) {
+ // Add the event to the map
+ synchronized (fromApexEventMap) {
+ cacheSynchronizedEvent(fromApexEventMap, executionId, event);
+ }
+ }
+
+ /**
+ * Remove the record of an event received from Apex if it exists in the cache.
+ *
+ * @param executionId the execution ID of the event
+ * @return The removed event
+ */
+ public Object removeCachedEventFromApexIfExists(final long executionId) {
+ synchronized (fromApexEventMap) {
+ return removeCachedEventIfExists(fromApexEventMap, executionId);
+ }
+ }
+
+ /**
+ * Check if an event exists in the from apex cache.
+ *
+ * @param executionId the execution ID of the event
+ * @return true if the event exists, false otherwise
+ */
+ public boolean existsEventFromApex(final long executionId) {
+ synchronized (fromApexEventMap) {
+ return fromApexEventMap.containsKey(executionId);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ LOGGER.entry();
+
+ // Periodic scan of outstanding events
+ while (synchronousEventCacheThread.isAlive() && !stopOrderedFlag) {
+ ThreadUtilities.sleep(OUTSTANDING_EVENT_POLL_TIMEOUT);
+
+ // Check for timeouts on events
+ synchronized (toApexEventMap) {
+ timeoutEventsOnCache(toApexEventMap);
+ }
+ synchronized (fromApexEventMap) {
+ timeoutEventsOnCache(fromApexEventMap);
+ }
+ }
+
+ LOGGER.exit();
+ }
+
+ /**
+ * Stops the scanning thread and clears the cache.
+ */
+ public synchronized void stop() {
+ LOGGER.entry();
+ stopOrderedFlag = true;
+
+ while (synchronousEventCacheThread.isAlive()) {
+ ThreadUtilities.sleep(CACHE_STOP_WAIT_INTERVAL);
+ }
+
+ // Check if there are any unprocessed events
+ if (!toApexEventMap.isEmpty()) {
+ LOGGER.warn(toApexEventMap.size() + " synchronous events dropped due to system shutdown");
+ }
+
+ toApexEventMap.clear();
+ LOGGER.exit();
+ }
+
+ /**
+ * Cache a synchronized event sent in an event cache.
+ * @param eventCacheMap the map to cache the event on
+ * @param executionId the execution ID of the event
+ * @param event the event to cache
+ */
+ private void cacheSynchronizedEvent(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap, final long executionId, final Object event) {
+ LOGGER.entry("Adding event with execution ID: " + executionId);
+
+ // Check if the event is already in the cache
+ if (eventCacheMap.containsKey(executionId)) {
+ // If there was no sent event then the event timed out or some unexpected event was received
+ final String errorMessage = "an event with ID " + executionId
+ + " already exists in the synchronous event cache, execution IDs must be unique in the system";
+ LOGGER.warn(errorMessage);
+ throw new ApexEventRuntimeException(errorMessage);
+ }
+
+ // Add the event to the map
+ eventCacheMap.put(executionId, new SimpleEntry<Long, Object>(System.currentTimeMillis(), event));
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("event has been cached:" + event);
+ }
+
+ LOGGER.exit("Added: " + executionId);
+ }
+
+ /**
+ * Remove the record of an event if it exists in the cache.
+ *
+ * @param eventCacheMap the map to remove the event from
+ * @param executionId the execution ID of the event
+ * @return The removed event
+ */
+ private Object removeCachedEventIfExists(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap, final long executionId) {
+ LOGGER.entry("Removing: " + executionId);
+
+ final SimpleEntry<Long, Object> removedEventEntry = eventCacheMap.remove(executionId);
+
+ if (removedEventEntry != null) {
+ LOGGER.exit("Removed: " + executionId);
+ return removedEventEntry.getValue();
+ }
+ else {
+ // The event may not be one of the events in our cache, so we just ignore removal failures
+ return null;
+ }
+ }
+
+ /**
+ * Time out events on an event cache map. Events that have a timeout longer than the configured timeout are timed out.
+ * @param eventCacheMap the event cache to operate on
+ */
+ private void timeoutEventsOnCache(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap) {
+ // Use a set to keep track of the events that have timed out
+ final Set<Long> timedOutEventSet = new HashSet<>();
+
+ for (final Entry<Long, SimpleEntry<Long, Object>> cachedEventEntry : eventCacheMap.entrySet()) {
+ // The amount of time we are waiting for the event reply
+ final long eventWaitTime = System.currentTimeMillis() - cachedEventEntry.getValue().getKey();
+
+ // Have we a timeout?
+ if (eventWaitTime > synchronousEventTimeout) {
+ timedOutEventSet.add(cachedEventEntry.getKey());
+ }
+ }
+
+ // Remove timed out events from the map
+ for (final long timedoutEventExecutionID : timedOutEventSet) {
+ // Remove the map entry and issue a warning
+ final SimpleEntry<Long, Object> timedOutEventEntry = eventCacheMap.remove(timedoutEventExecutionID);
+
+ LOGGER.warn("synchronous event timed out, reply not received in " + synchronousEventTimeout + " milliseconds on event "
+ + timedOutEventEntry.getValue());
+ }
+ }
+}