diff options
2 files changed, 6 insertions, 9 deletions
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java index c2be1b4ac..2ac383696 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java @@ -48,13 +48,10 @@ public class ExecutionQueueServiceImpl<M extends Runnable> implements ExecutionQ @Override public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException{ - QueueMessage queueMessage = null; - try { Date expirationTime = calculateExpirationTime(timeout,unit); - queueMessage = new QueueMessage(message,expirationTime); QueueManager queueManager = QueueManager.getInstance(); - boolean enqueueTask = queueManager.enqueueTask(queueMessage); + boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<M>(message,expirationTime)); if(!enqueueTask){ throw new APPCException("failed to put message in queue"); } diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java index 2d4907fa1..cf625b4d5 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java @@ -33,7 +33,7 @@ import com.att.eelf.configuration.EELFManager; public class QueueManager { - private LinkedBlockingQueue<QueueMessage> queue; + private LinkedBlockingQueue<QueueMessage<? extends Runnable>> queue; private MessageExpirationListener listener; @@ -59,7 +59,7 @@ public class QueueManager { } private void init(){ - queue = new LinkedBlockingQueue(MAX_QUEUE_SIZE); + queue = new LinkedBlockingQueue<QueueMessage<? extends Runnable>>(MAX_QUEUE_SIZE); messageExecutor = Executors.newFixedThreadPool(MAX_THREAD_SIZE,Util.getThreadFactory(true)); for(int i=0;i<MAX_THREAD_SIZE;i++){ @@ -68,7 +68,7 @@ public class QueueManager { public void run() { while (true){ try{ - QueueMessage queueMessage = queue.take(); + QueueMessage<? extends Runnable> queueMessage = queue.take(); if(messageExpired(queueMessage)){ logger.debug("Message expired "+ queueMessage.getMessage()); if(listener != null){ @@ -94,11 +94,11 @@ public class QueueManager { this.listener = listener; } - public boolean enqueueTask(QueueMessage queueMessage) { + public boolean enqueueTask(QueueMessage<? extends Runnable> queueMessage) { return queue.offer(queueMessage); } - private boolean messageExpired(QueueMessage queueMessage) { + private boolean messageExpired(QueueMessage<? extends Runnable> queueMessage) { if(queueMessage.getExpirationTime() != null){ return queueMessage.getExpirationTime().getTime() < System.currentTimeMillis(); } |