diff options
Diffstat (limited to 'appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java')
3 files changed, 130 insertions, 95 deletions
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java index 4f97a97f9..8670adabd 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java @@ -24,49 +24,76 @@ package org.openecomp.appc.executionqueue.helper; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; - +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import org.openecomp.appc.configuration.Configuration; import org.openecomp.appc.configuration.ConfigurationFactory; +import org.openecomp.appc.executionqueue.impl.QueueManager; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; public class Util { - private static final Configuration configuration = ConfigurationFactory.getConfiguration(); + private final EELFLogger logger = EELFManager.getInstance().getLogger(Util.class); + private final int default_queue_size = 10; + private final int default_threadpool_size = 10; + private final String queue_size_key = "appc.dispatcher.executionqueue.backlog.size"; + private final String threadpool_size_key = "appc.dispatcher.executionqueue.threadpool.size"; + + private Configuration configuration; - public static int DEFAULT_QUEUE_SIZE = 10; - public static int DEFAULT_THREADPOOL_SIZE = 10; + /** + * Initialization. + * <p>Used by blueprint. + */ + public void init() { - public static int getExecutionQueSize(){ - String sizeStr = configuration.getProperty("appc.dispatcher.executionqueue.backlog.size", String.valueOf(DEFAULT_QUEUE_SIZE)); - int size = DEFAULT_QUEUE_SIZE; - try{ + configuration = ConfigurationFactory.getConfiguration(); + } + + public int getExecutionQueueSize() { + String sizeStr = configuration.getProperty(queue_size_key, String.valueOf(default_queue_size)); + + int size = default_queue_size; + try { size = Integer.parseInt(sizeStr); + } catch (NumberFormatException e) { + logger.error("Error while parse key:" + queue_size_key + " got from configuration " + e.getMessage(), e); } - catch (NumberFormatException e){ - } return size; } - public static int getThreadPoolSize(){ - String sizeStr = configuration.getProperty("appc.dispatcher.executionqueue.threadpool.size", String.valueOf(DEFAULT_THREADPOOL_SIZE)); - int size = DEFAULT_THREADPOOL_SIZE; - try{ + public int getThreadPoolSize() { + String sizeStr = configuration.getProperty(threadpool_size_key, String.valueOf(default_threadpool_size)); + + int size = default_threadpool_size; + try { size = Integer.parseInt(sizeStr); + } catch (NumberFormatException e) { + logger.error("Error while parse key:" + threadpool_size_key + " got from configuration " + + e.getMessage(), e); } - catch (NumberFormatException e){ - } return size; } - public static ThreadFactory getThreadFactory(final boolean isDaemon){ + public ThreadFactory getThreadFactory(final boolean isDaemon, final String threadNamePrefix) { return new ThreadFactory() { - final ThreadFactory factory = Executors.defaultThreadFactory(); + private final String THREAD_NAME_PATTERN = "%s-%d"; + private final ThreadFactory factory = Executors.defaultThreadFactory(); + private final AtomicInteger counter = new AtomicInteger(); + public Thread newThread(Runnable r) { Thread t = factory.newThread(r); t.setDaemon(isDaemon); + if (threadNamePrefix != null && !threadNamePrefix.isEmpty()) { + final String threadName = String.format(THREAD_NAME_PATTERN, threadNamePrefix, counter + .incrementAndGet()); + t.setName(threadName); + } return t; } }; diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java index 3092bd881..c29078c27 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java @@ -24,51 +24,54 @@ package org.openecomp.appc.executionqueue.impl; -import java.time.Instant; -import java.util.Calendar; -import java.util.Date; -import java.util.concurrent.TimeUnit; - +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import org.openecomp.appc.exceptions.APPCException; import org.openecomp.appc.executionqueue.ExecutionQueueService; import org.openecomp.appc.executionqueue.MessageExpirationListener; import org.openecomp.appc.executionqueue.impl.object.QueueMessage; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; +import java.time.Instant; +import java.util.concurrent.TimeUnit; public class ExecutionQueueServiceImpl<M extends Runnable> implements ExecutionQueueService<M> { private static final EELFLogger logger = - EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class); + EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class); - ExecutionQueueServiceImpl(){ + private QueueManager queueManager; + + public ExecutionQueueServiceImpl() { + //do nothing + } + /** + * Injected by blueprint + * + * @param queueManager queue manager to be set + */ + public void setQueueManager(QueueManager queueManager) { + this.queueManager = queueManager; } @Override public void putMessage(M message) throws APPCException { - this.putMessage(message,-1,null); + this.putMessage(message, -1, null); } @Override - public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException{ - try { - Instant expirationTime = calculateExpirationTime(timeout,unit); - QueueManager queueManager = QueueManager.getInstance(); - boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<M>(message,expirationTime)); - if(!enqueueTask){ - throw new APPCException("failed to put message in queue"); - } - } catch (Exception e) { - logger.error("Error in putMessage method of ExecutionQueueServiceImpl" + e.getMessage()); - throw new APPCException(e); + public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException { + Instant expirationTime = calculateExpirationTime(timeout, unit); + boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<>(message, expirationTime)); + if (!enqueueTask) { + logger.error("Error in putMessage method of ExecutionQueueServiceImpl"); + throw new APPCException("Failed to put message in queue"); } } @Override public void registerMessageExpirationListener(MessageExpirationListener listener) { - QueueManager.getInstance().setListener(listener); + queueManager.setListener(listener); } private Instant calculateExpirationTime(long timeToLive, TimeUnit unit) { diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java index 11d0b8d69..b78f399e0 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java @@ -24,82 +24,87 @@ package org.openecomp.appc.executionqueue.impl; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; - +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import org.openecomp.appc.executionqueue.MessageExpirationListener; import org.openecomp.appc.executionqueue.helper.Util; import org.openecomp.appc.executionqueue.impl.object.QueueMessage; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class QueueManager { - private LinkedBlockingQueue<QueueMessage<? extends Runnable>> queue; + private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class); private MessageExpirationListener listener; - - private static int MAX_QUEUE_SIZE = Util.getExecutionQueSize(); - - private static int MAX_THREAD_SIZE = Util.getThreadPoolSize(); - private ExecutorService messageExecutor; + private int max_thread_size; + private int max_queue_size; + private Util executionQueueUtil; - private static final EELFLogger logger = - EELFManager.getInstance().getLogger(QueueManager.class); + public QueueManager() { + //do nothing + } - private QueueManager(){ - init(); + /** + * Initialization method used by blueprint + */ + public void init() { + max_thread_size = executionQueueUtil.getThreadPoolSize(); + max_queue_size = executionQueueUtil.getExecutionQueueSize(); + messageExecutor = new ThreadPoolExecutor( + max_thread_size, + max_thread_size, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(max_queue_size), + executionQueueUtil.getThreadFactory(true, "appc-dispatcher"), + new ThreadPoolExecutor.AbortPolicy()); } - private static class QueueManagerHolder { - private static final QueueManager INSTANCE = new QueueManager(); + /** + * Destory method used by blueprint + */ + public void stop() { + messageExecutor.shutdownNow(); } - public static QueueManager getInstance() { - return QueueManagerHolder.INSTANCE; + public void setListener(MessageExpirationListener listener) { + this.listener = listener; } - private void init(){ - queue = new LinkedBlockingQueue<QueueMessage<? extends Runnable>>(MAX_QUEUE_SIZE); - messageExecutor = Executors.newFixedThreadPool(MAX_THREAD_SIZE,Util.getThreadFactory(true)); + /** + * Injected by blueprint + * + * @param executionQueueUtil Util to be set + */ + public void setExecutionQueueUtil(Util executionQueueUtil) { + this.executionQueueUtil = executionQueueUtil; + } - for(int i=0;i<MAX_THREAD_SIZE;i++){ - messageExecutor.submit(new Runnable() { - @Override - public void run() { - while (true){ - try{ - QueueMessage<? extends Runnable> queueMessage = queue.take(); - if (queueMessage.isExpired()) { - logger.debug("Message expired "+ queueMessage.getMessage()); - if(listener != null){ - listener.onMessageExpiration(queueMessage.getMessage()); - } - else{ - logger.warn("Listener not available for expired message "); - } - } - else{ - queueMessage.getMessage().run(); - } - } catch (Exception e) { - logger.error("Error in startMessagePolling method of ExecutionQueueServiceImpl" + e.getMessage()); - } + public boolean enqueueTask(QueueMessage queueMessage) { + boolean isEnqueued = true; + try { + messageExecutor.execute(() -> { + if (queueMessage.isExpired()) { + logger.debug("Message expired " + queueMessage.getMessage()); + if (listener != null) { + listener.onMessageExpiration(queueMessage.getMessage()); + } else { + logger.warn("Listener not available for expired message "); } + } else { + queueMessage.getMessage().run(); } }); + } catch (RejectedExecutionException ree) { + isEnqueued = false; } - } - public void setListener(MessageExpirationListener listener) { - this.listener = listener; + return isEnqueued; } - - public boolean enqueueTask(QueueMessage<? extends Runnable> queueMessage) { - return queue.offer(queueMessage); - } - } |