aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java2
-rw-r--r--src/main/java/org/onap/aai/spike/service/EchoService.java3
-rw-r--r--src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java16
3 files changed, 11 insertions, 10 deletions
diff --git a/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java b/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java
index 58795b3..be713ff 100644
--- a/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java
+++ b/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java
@@ -189,7 +189,7 @@ public class OffsetManager {
*
* @return - The next 'safe' offset.
*/
- public long getNextOffsetToCommit() {
+ public Long getNextOffsetToCommit() {
return nextOffsetToCommit;
}
diff --git a/src/main/java/org/onap/aai/spike/service/EchoService.java b/src/main/java/org/onap/aai/spike/service/EchoService.java
index 6c5b5ee..cf3d1b7 100644
--- a/src/main/java/org/onap/aai/spike/service/EchoService.java
+++ b/src/main/java/org/onap/aai/spike/service/EchoService.java
@@ -76,8 +76,7 @@ public class EchoService {
auditLogger.info(SpikeMsgs.PROCESS_REST_REQUEST,
new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, Status.OK.toString())
.setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, Status.OK.toString()),
- (req != null) ? req.getMethod() : "Unknown", (req != null) ? req.getRequestURL().toString() : "Unknown",
- (req != null) ? req.getRemoteHost() : "Unknown", Status.OK.toString());
+ req.getMethod(), req.getRequestURL().toString(), req.getRemoteHost(), Status.OK.toString());
MDC.clear();
return new ResponseEntity<>("Alive", HttpStatus.OK);
diff --git a/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java b/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java
index cd404b0..a18590a 100644
--- a/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java
+++ b/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java
@@ -20,14 +20,14 @@
*/
package org.onap.aai.spike.service;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import java.util.ArrayList;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.PriorityBlockingQueue;
import javax.naming.OperationNotSupportedException;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.event.api.EventConsumer;
@@ -40,7 +40,6 @@ import org.onap.aai.spike.event.incoming.OffsetManager;
import org.onap.aai.spike.event.outgoing.SpikeEventComparator;
import org.onap.aai.spike.event.outgoing.SpikeEventExclusionStrategy;
import org.onap.aai.spike.event.outgoing.SpikeGraphEvent;
-import org.onap.aai.spike.exception.SpikeException;
import org.onap.aai.spike.logging.SpikeMsgs;
import org.onap.aai.spike.util.SpikeConstants;
import org.onap.aai.spike.util.SpikeProperties;
@@ -90,7 +89,7 @@ public class SpikeEventProcessor extends TimerTask {
} catch (Exception ex) {
}
- eventQueue = new PriorityBlockingQueue<SpikeGraphEvent>(eventQueueCapacity, new SpikeEventComparator());
+ eventQueue = new PriorityBlockingQueue<>(eventQueueCapacity, new SpikeEventComparator());
new Thread(new SpikeEventPublisher()).start();
// Instantiate the offset manager. This will run a background thread that
@@ -106,6 +105,7 @@ public class SpikeEventProcessor extends TimerTask {
if (consumer == null) {
logger.error(SpikeMsgs.SPIKE_SERVICE_STARTED_FAILURE, SpikeConstants.SPIKE_SERVICE_NAME);
+ return;
}
Iterable<MessageWithOffset> events = null;
@@ -164,9 +164,11 @@ public class SpikeEventProcessor extends TimerTask {
+ modelEvent.getObjectKey() + " , transaction-id: " + modelEvent.getTransactionId());
logger.debug(SpikeMsgs.SPIKE_EVENT_PROCESSED, modelEventJson);
- } catch (SpikeException | InterruptedException e) {
+ } catch (InterruptedException e) {
logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE,
e.getMessage() + ". Incoming event payload:\n" + event.getMessage());
+ // Restore the interrupted status...
+ Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE,
e.getMessage() + ". Incoming event payload:\n" + event.getMessage());
@@ -252,9 +254,9 @@ public class SpikeEventProcessor extends TimerTask {
}
} catch (InterruptedException e) {
-
// Restore the interrupted status.
Thread.currentThread().interrupt();
+ continue;
}
// Try publishing the event to the event bus. This call will block
@@ -288,7 +290,7 @@ public class SpikeEventProcessor extends TimerTask {
try {
Thread.sleep(60000);
} catch (InterruptedException e1) {
- e1.printStackTrace();
+ Thread.currentThread().interrupt();
}
} catch (Exception e) {
logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE, e.getMessage());