summaryrefslogtreecommitdiffstats
path: root/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main
diff options
context:
space:
mode:
authorAnand <ac204h@att.com>2018-01-04 19:35:51 -0500
committerSkip Wonnell <skip@att.com>2018-01-08 22:09:50 +0000
commit36bcd566167f2f91c0e8e7a304fce5f6bc150776 (patch)
tree7ba7acfee7e520da83a2b6286ea464285bc8cf67 /appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main
parent38d293d605b42f88c9c82319ba848b4b81e45b64 (diff)
Include impacted changes for APPC-346,APPC-348
Issue-ID: APPC-347 Change-Id: I399bc2a1e0dfd481e103032a373bb80fce5baf41 Signed-off-by: Anand <ac204h@att.com>
Diffstat (limited to 'appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main')
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java1
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java20
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java38
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java61
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java28
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java15
6 files changed, 54 insertions, 109 deletions
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java
index 1423962ef..2c4aa0853 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java
@@ -31,5 +31,4 @@ import org.onap.appc.exceptions.APPCException;
public interface ExecutionQueueService<M extends Runnable> {
void putMessage(M message) throws APPCException;
void putMessage(M message, long timeout, TimeUnit unit) throws APPCException;
- void registerMessageExpirationListener(MessageExpirationListener listener);
}
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java
index 164a8b563..09d49deef 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java
@@ -24,8 +24,6 @@
package org.onap.appc.executionqueue.helper;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
import org.onap.appc.configuration.Configuration;
import org.onap.appc.configuration.ConfigurationFactory;
@@ -35,11 +33,10 @@ import java.util.concurrent.atomic.AtomicInteger;
public class Util {
- private final EELFLogger logger = EELFManager.getInstance().getLogger(Util.class);
- private final int default_queue_size = 10;
- private final int default_threadpool_size = 10;
- private final String queue_size_key = "appc.dispatcher.executionqueue.backlog.size";
- private final String threadpool_size_key = "appc.dispatcher.executionqueue.threadpool.size";
+ private int default_queue_size = 10;
+ private int default_threadpool_size = 10;
+ private String queue_size_key = "appc.dispatcher.executionqueue.backlog.size";
+ private String threadpool_size_key = "appc.dispatcher.executionqueue.threadpool.size";
private Configuration configuration;
@@ -48,7 +45,6 @@ public class Util {
* <p>Used by blueprint.
*/
public void init() {
-
configuration = ConfigurationFactory.getConfiguration();
}
@@ -59,7 +55,7 @@ public class Util {
try {
size = Integer.parseInt(sizeStr);
} catch (NumberFormatException e) {
- logger.error("Error while parse key:" + queue_size_key + " got from configuration " + e.getMessage(), e);
+
}
return size;
@@ -72,8 +68,7 @@ public class Util {
try {
size = Integer.parseInt(sizeStr);
} catch (NumberFormatException e) {
- logger.error("Error while parse key:" + threadpool_size_key + " got from configuration "
- + e.getMessage(), e);
+
}
return size;
@@ -89,8 +84,7 @@ public class Util {
Thread t = factory.newThread(r);
t.setDaemon(isDaemon);
if (threadNamePrefix != null && !threadNamePrefix.isEmpty()) {
- final String threadName = String.format(THREAD_NAME_PATTERN, threadNamePrefix, counter
- .incrementAndGet());
+ final String threadName = String.format(THREAD_NAME_PATTERN, threadNamePrefix, counter.incrementAndGet());
t.setName(threadName);
}
return t;
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java
deleted file mode 100644
index f071be6f0..000000000
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.executionqueue.impl;
-
-import org.onap.appc.executionqueue.ExecutionQueueService;
-
-public class ExecutionQueueServiceFactory {
-
- private static class ExecutionQueueServiceHolder {
- public static final ExecutionQueueService executionQueueService = new ExecutionQueueServiceImpl();
- }
-
- public static ExecutionQueueService getExecutionQueueService() {
- return ExecutionQueueServiceHolder.executionQueueService;
- }
-}
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
index 0634a0eb2..027cc9d4b 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
@@ -31,58 +31,59 @@ import org.onap.appc.executionqueue.ExecutionQueueService;
import org.onap.appc.executionqueue.MessageExpirationListener;
import org.onap.appc.executionqueue.impl.object.QueueMessage;
-import java.time.Instant;
+import java.util.Calendar;
+import java.util.Date;
import java.util.concurrent.TimeUnit;
public class ExecutionQueueServiceImpl<M extends Runnable> implements ExecutionQueueService<M> {
- private static final EELFLogger logger =
- EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class);
+ private final EELFLogger logger = EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class);
private QueueManager queueManager;
- public ExecutionQueueServiceImpl() {
- //do nothing
+ public ExecutionQueueServiceImpl(){
+
+ }
+
+ @Override
+ public void putMessage(M message) throws APPCException {
+ this.putMessage(message,-1,null);
}
/**
* Injected by blueprint
- *
- * @param queueManager queue manager to be set
+ * @param queueManager
*/
public void setQueueManager(QueueManager queueManager) {
this.queueManager = queueManager;
}
@Override
- public void putMessage(M message) throws APPCException {
- this.putMessage(message, -1, null);
- }
+ public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException{
+ QueueMessage queueMessage;
- @Override
- public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException {
- Instant expirationTime = calculateExpirationTime(timeout, unit);
- boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<>(message, expirationTime));
- if (!enqueueTask) {
- logger.error("Error in putMessage method of ExecutionQueueServiceImpl");
- throw new APPCException("Failed to put message in queue");
+ try {
+ Date expirationTime = calculateExpirationTime(timeout,unit);
+ queueMessage = new QueueMessage(message,expirationTime);
+ boolean enqueueTask = queueManager.enqueueTask(queueMessage);
+ if(!enqueueTask){
+ throw new APPCException("failed to put message in queue");
+ }
+ } catch (Exception e) {
+ logger.error("Error in putMessage method of ExecutionQueueServiceImpl" + e.getMessage());
+ throw new APPCException(e);
}
}
- @Override
- public void registerMessageExpirationListener(MessageExpirationListener listener) {
- queueManager.setListener(listener);
- }
-
- private Instant calculateExpirationTime(long timeToLive, TimeUnit unit) {
- if (timeToLive > 0 && unit != null) {
- // as of Java 8, there is no built-in conversion method from
- // TimeUnit to ChronoUnit; do it manually
- return Instant.now().plusMillis(unit.toMillis(timeToLive));
- } else {
- // never expires
- return Instant.MAX;
+ private Date calculateExpirationTime(long timeToLive, TimeUnit unit) {
+ Date expirationTime = null;
+ if(timeToLive > 0){
+ long currentTime = System.currentTimeMillis();
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(currentTime + unit.toMillis(timeToLive));
+ expirationTime = cal.getTime();
}
+ return expirationTime;
}
}
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java
index db0e3d4c5..c33c66042 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java
@@ -41,14 +41,14 @@ public class QueueManager {
private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class);
- private MessageExpirationListener listener;
private ExecutorService messageExecutor;
+ private LinkedBlockingQueue<QueueMessage> queue;
private int max_thread_size;
private int max_queue_size;
private Util executionQueueUtil;
public QueueManager() {
- //do nothing
+
}
/**
@@ -90,14 +90,10 @@ public class QueueManager {
}
}
- public void setListener(MessageExpirationListener listener) {
- this.listener = listener;
- }
-
/**
* Injected by blueprint
*
- * @param executionQueueUtil Util to be set
+ * @param executionQueueUtil
*/
public void setExecutionQueueUtil(Util executionQueueUtil) {
this.executionQueueUtil = executionQueueUtil;
@@ -106,22 +102,16 @@ public class QueueManager {
public boolean enqueueTask(QueueMessage queueMessage) {
boolean isEnqueued = true;
try {
- messageExecutor.execute(() -> {
- if (queueMessage.isExpired()) {
- logger.debug("Message expired " + queueMessage.getMessage());
- if (listener != null) {
- listener.onMessageExpiration(queueMessage.getMessage());
- } else {
- logger.warn("Listener not available for expired message ");
- }
- } else {
- queueMessage.getMessage().run();
- }
- });
+ messageExecutor.execute(() -> queueMessage.getMessage().run());
} catch (RejectedExecutionException ree) {
isEnqueued = false;
}
return isEnqueued;
}
+
+ private boolean messageExpired(QueueMessage queueMessage) {
+ return queueMessage.getExpirationTime() != null &&
+ queueMessage.getExpirationTime().getTime() < System.currentTimeMillis();
+ }
}
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java
index bb48da7e5..75d1275d2 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java
@@ -24,23 +24,22 @@
package org.onap.appc.executionqueue.impl.object;
-import java.time.Instant;
-import java.util.Objects;
+import java.util.Date;
public class QueueMessage<M extends Runnable> {
- private final M message;
- private final Instant expirationTime;
- public QueueMessage(M message, Instant expirationTime){
+ M message;
+ Date expirationTime;
+ public QueueMessage(M message, Date expirationTime){
this.message = message;
- this.expirationTime = Objects.requireNonNull(expirationTime);
+ this.expirationTime = expirationTime;
}
public M getMessage() {
return message;
}
- public boolean isExpired() {
- return expirationTime.isBefore(Instant.now());
+ public Date getExpirationTime() {
+ return expirationTime;
}
}