aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java')
-rw-r--r--src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java16
1 files changed, 9 insertions, 7 deletions
diff --git a/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java b/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java
index cd404b0..a18590a 100644
--- a/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java
+++ b/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java
@@ -20,14 +20,14 @@
*/
package org.onap.aai.spike.service;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import java.util.ArrayList;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.PriorityBlockingQueue;
import javax.naming.OperationNotSupportedException;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.event.api.EventConsumer;
@@ -40,7 +40,6 @@ import org.onap.aai.spike.event.incoming.OffsetManager;
import org.onap.aai.spike.event.outgoing.SpikeEventComparator;
import org.onap.aai.spike.event.outgoing.SpikeEventExclusionStrategy;
import org.onap.aai.spike.event.outgoing.SpikeGraphEvent;
-import org.onap.aai.spike.exception.SpikeException;
import org.onap.aai.spike.logging.SpikeMsgs;
import org.onap.aai.spike.util.SpikeConstants;
import org.onap.aai.spike.util.SpikeProperties;
@@ -90,7 +89,7 @@ public class SpikeEventProcessor extends TimerTask {
} catch (Exception ex) {
}
- eventQueue = new PriorityBlockingQueue<SpikeGraphEvent>(eventQueueCapacity, new SpikeEventComparator());
+ eventQueue = new PriorityBlockingQueue<>(eventQueueCapacity, new SpikeEventComparator());
new Thread(new SpikeEventPublisher()).start();
// Instantiate the offset manager. This will run a background thread that
@@ -106,6 +105,7 @@ public class SpikeEventProcessor extends TimerTask {
if (consumer == null) {
logger.error(SpikeMsgs.SPIKE_SERVICE_STARTED_FAILURE, SpikeConstants.SPIKE_SERVICE_NAME);
+ return;
}
Iterable<MessageWithOffset> events = null;
@@ -164,9 +164,11 @@ public class SpikeEventProcessor extends TimerTask {
+ modelEvent.getObjectKey() + " , transaction-id: " + modelEvent.getTransactionId());
logger.debug(SpikeMsgs.SPIKE_EVENT_PROCESSED, modelEventJson);
- } catch (SpikeException | InterruptedException e) {
+ } catch (InterruptedException e) {
logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE,
e.getMessage() + ". Incoming event payload:\n" + event.getMessage());
+ // Restore the interrupted status...
+ Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE,
e.getMessage() + ". Incoming event payload:\n" + event.getMessage());
@@ -252,9 +254,9 @@ public class SpikeEventProcessor extends TimerTask {
}
} catch (InterruptedException e) {
-
// Restore the interrupted status.
Thread.currentThread().interrupt();
+ continue;
}
// Try publishing the event to the event bus. This call will block
@@ -288,7 +290,7 @@ public class SpikeEventProcessor extends TimerTask {
try {
Thread.sleep(60000);
} catch (InterruptedException e1) {
- e1.printStackTrace();
+ Thread.currentThread().interrupt();
}
} catch (Exception e) {
logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE, e.getMessage());