summaryrefslogtreecommitdiffstats
path: root/appc-dispatcher/appc-dispatcher-common
diff options
context:
space:
mode:
Diffstat (limited to 'appc-dispatcher/appc-dispatcher-common')
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java67
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java47
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java111
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/ExecutionQueueServiceTest.java (renamed from appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java)57
4 files changed, 155 insertions, 127 deletions
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java
index 4f97a97f9..8670adabd 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java
@@ -24,49 +24,76 @@
package org.openecomp.appc.executionqueue.helper;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
import org.openecomp.appc.configuration.Configuration;
import org.openecomp.appc.configuration.ConfigurationFactory;
+import org.openecomp.appc.executionqueue.impl.QueueManager;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
public class Util {
- private static final Configuration configuration = ConfigurationFactory.getConfiguration();
+ private final EELFLogger logger = EELFManager.getInstance().getLogger(Util.class);
+ private final int default_queue_size = 10;
+ private final int default_threadpool_size = 10;
+ private final String queue_size_key = "appc.dispatcher.executionqueue.backlog.size";
+ private final String threadpool_size_key = "appc.dispatcher.executionqueue.threadpool.size";
+
+ private Configuration configuration;
- public static int DEFAULT_QUEUE_SIZE = 10;
- public static int DEFAULT_THREADPOOL_SIZE = 10;
+ /**
+ * Initialization.
+ * <p>Used by blueprint.
+ */
+ public void init() {
- public static int getExecutionQueSize(){
- String sizeStr = configuration.getProperty("appc.dispatcher.executionqueue.backlog.size", String.valueOf(DEFAULT_QUEUE_SIZE));
- int size = DEFAULT_QUEUE_SIZE;
- try{
+ configuration = ConfigurationFactory.getConfiguration();
+ }
+
+ public int getExecutionQueueSize() {
+ String sizeStr = configuration.getProperty(queue_size_key, String.valueOf(default_queue_size));
+
+ int size = default_queue_size;
+ try {
size = Integer.parseInt(sizeStr);
+ } catch (NumberFormatException e) {
+ logger.error("Error while parse key:" + queue_size_key + " got from configuration " + e.getMessage(), e);
}
- catch (NumberFormatException e){
- }
return size;
}
- public static int getThreadPoolSize(){
- String sizeStr = configuration.getProperty("appc.dispatcher.executionqueue.threadpool.size", String.valueOf(DEFAULT_THREADPOOL_SIZE));
- int size = DEFAULT_THREADPOOL_SIZE;
- try{
+ public int getThreadPoolSize() {
+ String sizeStr = configuration.getProperty(threadpool_size_key, String.valueOf(default_threadpool_size));
+
+ int size = default_threadpool_size;
+ try {
size = Integer.parseInt(sizeStr);
+ } catch (NumberFormatException e) {
+ logger.error("Error while parse key:" + threadpool_size_key + " got from configuration "
+ + e.getMessage(), e);
}
- catch (NumberFormatException e){
- }
return size;
}
- public static ThreadFactory getThreadFactory(final boolean isDaemon){
+ public ThreadFactory getThreadFactory(final boolean isDaemon, final String threadNamePrefix) {
return new ThreadFactory() {
- final ThreadFactory factory = Executors.defaultThreadFactory();
+ private final String THREAD_NAME_PATTERN = "%s-%d";
+ private final ThreadFactory factory = Executors.defaultThreadFactory();
+ private final AtomicInteger counter = new AtomicInteger();
+
public Thread newThread(Runnable r) {
Thread t = factory.newThread(r);
t.setDaemon(isDaemon);
+ if (threadNamePrefix != null && !threadNamePrefix.isEmpty()) {
+ final String threadName = String.format(THREAD_NAME_PATTERN, threadNamePrefix, counter
+ .incrementAndGet());
+ t.setName(threadName);
+ }
return t;
}
};
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
index 3092bd881..c29078c27 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
@@ -24,51 +24,54 @@
package org.openecomp.appc.executionqueue.impl;
-import java.time.Instant;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
import org.openecomp.appc.exceptions.APPCException;
import org.openecomp.appc.executionqueue.ExecutionQueueService;
import org.openecomp.appc.executionqueue.MessageExpirationListener;
import org.openecomp.appc.executionqueue.impl.object.QueueMessage;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
public class ExecutionQueueServiceImpl<M extends Runnable> implements ExecutionQueueService<M> {
private static final EELFLogger logger =
- EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class);
+ EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class);
- ExecutionQueueServiceImpl(){
+ private QueueManager queueManager;
+
+ public ExecutionQueueServiceImpl() {
+ //do nothing
+ }
+ /**
+ * Injected by blueprint
+ *
+ * @param queueManager queue manager to be set
+ */
+ public void setQueueManager(QueueManager queueManager) {
+ this.queueManager = queueManager;
}
@Override
public void putMessage(M message) throws APPCException {
- this.putMessage(message,-1,null);
+ this.putMessage(message, -1, null);
}
@Override
- public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException{
- try {
- Instant expirationTime = calculateExpirationTime(timeout,unit);
- QueueManager queueManager = QueueManager.getInstance();
- boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<M>(message,expirationTime));
- if(!enqueueTask){
- throw new APPCException("failed to put message in queue");
- }
- } catch (Exception e) {
- logger.error("Error in putMessage method of ExecutionQueueServiceImpl" + e.getMessage());
- throw new APPCException(e);
+ public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException {
+ Instant expirationTime = calculateExpirationTime(timeout, unit);
+ boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<>(message, expirationTime));
+ if (!enqueueTask) {
+ logger.error("Error in putMessage method of ExecutionQueueServiceImpl");
+ throw new APPCException("Failed to put message in queue");
}
}
@Override
public void registerMessageExpirationListener(MessageExpirationListener listener) {
- QueueManager.getInstance().setListener(listener);
+ queueManager.setListener(listener);
}
private Instant calculateExpirationTime(long timeToLive, TimeUnit unit) {
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java
index 11d0b8d69..b78f399e0 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java
@@ -24,82 +24,87 @@
package org.openecomp.appc.executionqueue.impl;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
import org.openecomp.appc.executionqueue.MessageExpirationListener;
import org.openecomp.appc.executionqueue.helper.Util;
import org.openecomp.appc.executionqueue.impl.object.QueueMessage;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
public class QueueManager {
- private LinkedBlockingQueue<QueueMessage<? extends Runnable>> queue;
+ private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class);
private MessageExpirationListener listener;
-
- private static int MAX_QUEUE_SIZE = Util.getExecutionQueSize();
-
- private static int MAX_THREAD_SIZE = Util.getThreadPoolSize();
-
private ExecutorService messageExecutor;
+ private int max_thread_size;
+ private int max_queue_size;
+ private Util executionQueueUtil;
- private static final EELFLogger logger =
- EELFManager.getInstance().getLogger(QueueManager.class);
+ public QueueManager() {
+ //do nothing
+ }
- private QueueManager(){
- init();
+ /**
+ * Initialization method used by blueprint
+ */
+ public void init() {
+ max_thread_size = executionQueueUtil.getThreadPoolSize();
+ max_queue_size = executionQueueUtil.getExecutionQueueSize();
+ messageExecutor = new ThreadPoolExecutor(
+ max_thread_size,
+ max_thread_size,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue(max_queue_size),
+ executionQueueUtil.getThreadFactory(true, "appc-dispatcher"),
+ new ThreadPoolExecutor.AbortPolicy());
}
- private static class QueueManagerHolder {
- private static final QueueManager INSTANCE = new QueueManager();
+ /**
+ * Destory method used by blueprint
+ */
+ public void stop() {
+ messageExecutor.shutdownNow();
}
- public static QueueManager getInstance() {
- return QueueManagerHolder.INSTANCE;
+ public void setListener(MessageExpirationListener listener) {
+ this.listener = listener;
}
- private void init(){
- queue = new LinkedBlockingQueue<QueueMessage<? extends Runnable>>(MAX_QUEUE_SIZE);
- messageExecutor = Executors.newFixedThreadPool(MAX_THREAD_SIZE,Util.getThreadFactory(true));
+ /**
+ * Injected by blueprint
+ *
+ * @param executionQueueUtil Util to be set
+ */
+ public void setExecutionQueueUtil(Util executionQueueUtil) {
+ this.executionQueueUtil = executionQueueUtil;
+ }
- for(int i=0;i<MAX_THREAD_SIZE;i++){
- messageExecutor.submit(new Runnable() {
- @Override
- public void run() {
- while (true){
- try{
- QueueMessage<? extends Runnable> queueMessage = queue.take();
- if (queueMessage.isExpired()) {
- logger.debug("Message expired "+ queueMessage.getMessage());
- if(listener != null){
- listener.onMessageExpiration(queueMessage.getMessage());
- }
- else{
- logger.warn("Listener not available for expired message ");
- }
- }
- else{
- queueMessage.getMessage().run();
- }
- } catch (Exception e) {
- logger.error("Error in startMessagePolling method of ExecutionQueueServiceImpl" + e.getMessage());
- }
+ public boolean enqueueTask(QueueMessage queueMessage) {
+ boolean isEnqueued = true;
+ try {
+ messageExecutor.execute(() -> {
+ if (queueMessage.isExpired()) {
+ logger.debug("Message expired " + queueMessage.getMessage());
+ if (listener != null) {
+ listener.onMessageExpiration(queueMessage.getMessage());
+ } else {
+ logger.warn("Listener not available for expired message ");
}
+ } else {
+ queueMessage.getMessage().run();
}
});
+ } catch (RejectedExecutionException ree) {
+ isEnqueued = false;
}
- }
- public void setListener(MessageExpirationListener listener) {
- this.listener = listener;
+ return isEnqueued;
}
-
- public boolean enqueueTask(QueueMessage<? extends Runnable> queueMessage) {
- return queue.offer(queueMessage);
- }
-
}
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/ExecutionQueueServiceTest.java
index 6e9584894..067b6c3e7 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/ExecutionQueueServiceTest.java
@@ -25,50 +25,43 @@
package org.openecomp.appc.executionqueue;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
import org.mockito.Mockito;
+import org.mockito.Spy;
import org.openecomp.appc.exceptions.APPCException;
-import org.openecomp.appc.executionqueue.ExecutionQueueService;
-import org.openecomp.appc.executionqueue.impl.ExecutionQueueServiceFactory;
-import org.powermock.api.mockito.PowerMockito;
+import org.openecomp.appc.executionqueue.helper.Util;
+import org.openecomp.appc.executionqueue.impl.ExecutionQueueServiceImpl;
+import org.openecomp.appc.executionqueue.impl.QueueManager;
+import org.powermock.modules.junit4.PowerMockRunner;
-import java.util.concurrent.TimeUnit;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+@RunWith(PowerMockRunner.class)
+public class ExecutionQueueServiceTest {
-public class TestExecutionQueueService {
+ @InjectMocks
+ private ExecutionQueueServiceImpl service;
+ @Spy
+ private QueueManager queueManager = new QueueManager();
+ @Spy
+ private Util executionQueueUtil = new Util();
- @Test
- public void testPositiveFlow(){
- Message message = new Message();
- ExecutionQueueService service = ExecutionQueueServiceFactory.getExecutionQueueService();
- try {
- service.putMessage(message);
- waitFor(5000);
- Assert.assertTrue(message.isRunExecuted());
- } catch (APPCException e) {
- Assert.fail(e.toString());
- }
+ @Before
+ public void setup() {
+ Mockito.doReturn(true).when(queueManager).enqueueTask(any());
}
-// @Test
- public void testTimeout(){
- ExecutionQueueService service = ExecutionQueueServiceFactory.getExecutionQueueService();
+ @Test
+ public void testPositiveFlow() {
Message message = new Message();
- Listener listener = new Listener();
- service.registerMessageExpirationListener(listener);
try {
- service.putMessage(message,1, TimeUnit.MILLISECONDS);
- waitFor(5000);
- Assert.assertTrue(listener.isListenerExecuted());
+ service.putMessage(message);
+ Mockito.verify(queueManager, times(1)).enqueueTask(any());
} catch (APPCException e) {
- e.printStackTrace();
- }
- }
-
- private void waitFor(long milliSeconds){
- try {
- Thread.sleep(milliSeconds);
- } catch (InterruptedException e) {
Assert.fail(e.toString());
}
}