summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java5
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java10
2 files changed, 6 insertions, 9 deletions
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
index c2be1b4ac..2ac383696 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
@@ -48,13 +48,10 @@ public class ExecutionQueueServiceImpl<M extends Runnable> implements ExecutionQ
@Override
public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException{
- QueueMessage queueMessage = null;
-
try {
Date expirationTime = calculateExpirationTime(timeout,unit);
- queueMessage = new QueueMessage(message,expirationTime);
QueueManager queueManager = QueueManager.getInstance();
- boolean enqueueTask = queueManager.enqueueTask(queueMessage);
+ boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<M>(message,expirationTime));
if(!enqueueTask){
throw new APPCException("failed to put message in queue");
}
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java
index 2d4907fa1..cf625b4d5 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java
@@ -33,7 +33,7 @@ import com.att.eelf.configuration.EELFManager;
public class QueueManager {
- private LinkedBlockingQueue<QueueMessage> queue;
+ private LinkedBlockingQueue<QueueMessage<? extends Runnable>> queue;
private MessageExpirationListener listener;
@@ -59,7 +59,7 @@ public class QueueManager {
}
private void init(){
- queue = new LinkedBlockingQueue(MAX_QUEUE_SIZE);
+ queue = new LinkedBlockingQueue<QueueMessage<? extends Runnable>>(MAX_QUEUE_SIZE);
messageExecutor = Executors.newFixedThreadPool(MAX_THREAD_SIZE,Util.getThreadFactory(true));
for(int i=0;i<MAX_THREAD_SIZE;i++){
@@ -68,7 +68,7 @@ public class QueueManager {
public void run() {
while (true){
try{
- QueueMessage queueMessage = queue.take();
+ QueueMessage<? extends Runnable> queueMessage = queue.take();
if(messageExpired(queueMessage)){
logger.debug("Message expired "+ queueMessage.getMessage());
if(listener != null){
@@ -94,11 +94,11 @@ public class QueueManager {
this.listener = listener;
}
- public boolean enqueueTask(QueueMessage queueMessage) {
+ public boolean enqueueTask(QueueMessage<? extends Runnable> queueMessage) {
return queue.offer(queueMessage);
}
- private boolean messageExpired(QueueMessage queueMessage) {
+ private boolean messageExpired(QueueMessage<? extends Runnable> queueMessage) {
if(queueMessage.getExpirationTime() != null){
return queueMessage.getExpirationTime().getTime() < System.currentTimeMillis();
}