diff options
Diffstat (limited to 'src/main')
3 files changed, 11 insertions, 10 deletions
diff --git a/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java b/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java index 58795b3..be713ff 100644 --- a/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java +++ b/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java @@ -189,7 +189,7 @@ public class OffsetManager { * * @return - The next 'safe' offset. */ - public long getNextOffsetToCommit() { + public Long getNextOffsetToCommit() { return nextOffsetToCommit; } diff --git a/src/main/java/org/onap/aai/spike/service/EchoService.java b/src/main/java/org/onap/aai/spike/service/EchoService.java index 6c5b5ee..cf3d1b7 100644 --- a/src/main/java/org/onap/aai/spike/service/EchoService.java +++ b/src/main/java/org/onap/aai/spike/service/EchoService.java @@ -76,8 +76,7 @@ public class EchoService { auditLogger.info(SpikeMsgs.PROCESS_REST_REQUEST, new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, Status.OK.toString()) .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, Status.OK.toString()), - (req != null) ? req.getMethod() : "Unknown", (req != null) ? req.getRequestURL().toString() : "Unknown", - (req != null) ? req.getRemoteHost() : "Unknown", Status.OK.toString()); + req.getMethod(), req.getRequestURL().toString(), req.getRemoteHost(), Status.OK.toString()); MDC.clear(); return new ResponseEntity<>("Alive", HttpStatus.OK); diff --git a/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java b/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java index cd404b0..a18590a 100644 --- a/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java +++ b/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java @@ -20,14 +20,14 @@ */ package org.onap.aai.spike.service; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import java.util.ArrayList; import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.PriorityBlockingQueue; import javax.naming.OperationNotSupportedException; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.event.api.EventConsumer; @@ -40,7 +40,6 @@ import org.onap.aai.spike.event.incoming.OffsetManager; import org.onap.aai.spike.event.outgoing.SpikeEventComparator; import org.onap.aai.spike.event.outgoing.SpikeEventExclusionStrategy; import org.onap.aai.spike.event.outgoing.SpikeGraphEvent; -import org.onap.aai.spike.exception.SpikeException; import org.onap.aai.spike.logging.SpikeMsgs; import org.onap.aai.spike.util.SpikeConstants; import org.onap.aai.spike.util.SpikeProperties; @@ -90,7 +89,7 @@ public class SpikeEventProcessor extends TimerTask { } catch (Exception ex) { } - eventQueue = new PriorityBlockingQueue<SpikeGraphEvent>(eventQueueCapacity, new SpikeEventComparator()); + eventQueue = new PriorityBlockingQueue<>(eventQueueCapacity, new SpikeEventComparator()); new Thread(new SpikeEventPublisher()).start(); // Instantiate the offset manager. This will run a background thread that @@ -106,6 +105,7 @@ public class SpikeEventProcessor extends TimerTask { if (consumer == null) { logger.error(SpikeMsgs.SPIKE_SERVICE_STARTED_FAILURE, SpikeConstants.SPIKE_SERVICE_NAME); + return; } Iterable<MessageWithOffset> events = null; @@ -164,9 +164,11 @@ public class SpikeEventProcessor extends TimerTask { + modelEvent.getObjectKey() + " , transaction-id: " + modelEvent.getTransactionId()); logger.debug(SpikeMsgs.SPIKE_EVENT_PROCESSED, modelEventJson); - } catch (SpikeException | InterruptedException e) { + } catch (InterruptedException e) { logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e.getMessage() + ". Incoming event payload:\n" + event.getMessage()); + // Restore the interrupted status... + Thread.currentThread().interrupt(); } catch (Exception e) { logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e.getMessage() + ". Incoming event payload:\n" + event.getMessage()); @@ -252,9 +254,9 @@ public class SpikeEventProcessor extends TimerTask { } } catch (InterruptedException e) { - // Restore the interrupted status. Thread.currentThread().interrupt(); + continue; } // Try publishing the event to the event bus. This call will block @@ -288,7 +290,7 @@ public class SpikeEventProcessor extends TimerTask { try { Thread.sleep(60000); } catch (InterruptedException e1) { - e1.printStackTrace(); + Thread.currentThread().interrupt(); } } catch (Exception e) { logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE, e.getMessage()); |