diff options
author | Hao Kuang <Hao.Kuang@amdocs.com> | 2017-08-18 16:07:56 +0000 |
---|---|---|
committer | Patrick Brady <pb071s@att.com> | 2017-08-18 20:16:17 +0000 |
commit | 7fec0d41c1fdfdb0eef04c6df8fb4b5368a975a5 (patch) | |
tree | e18d5951fc7888f68af1c2f059234ab6bbd7978a | |
parent | 5b7590935559fe9e286a703dc662a60b47b7ed1a (diff) |
Fix ExecutionQueue cannot be cleaned
Fix that LCM Requests in Execution Queue continue after Stop OAM
request sent.
Fix that Appc-Ansible bundle cannot be fully stopped because
of NPE.
Issue-Id: APPC-159
Change-Id: I8f0a3a79a5c572ad84e66f71b4ddb47118704302
Signed-off-by: Hao Kuang <Hao.Kuang@amdocs.com>
7 files changed, 220 insertions, 186 deletions
diff --git a/appc-adapters/appc-ansible-adapter/appc-ansible-adapter-bundle/src/main/java/org/openecomp/appc/adapter/ansible/AnsibleActivator.java b/appc-adapters/appc-ansible-adapter/appc-ansible-adapter-bundle/src/main/java/org/openecomp/appc/adapter/ansible/AnsibleActivator.java index fa81ef7f9..865841db2 100644 --- a/appc-adapters/appc-ansible-adapter/appc-ansible-adapter-bundle/src/main/java/org/openecomp/appc/adapter/ansible/AnsibleActivator.java +++ b/appc-adapters/appc-ansible-adapter/appc-ansible-adapter-bundle/src/main/java/org/openecomp/appc/adapter/ansible/AnsibleActivator.java @@ -38,9 +38,6 @@ import com.att.eelf.configuration.EELFManager; /** * This activator is used to initialize and terminate an instance of AnsibleAdapter class - * - * Author : Ashwin Sridharan - * Date : Oct 2016 */ public class AnsibleActivator implements BundleActivator { @@ -57,12 +54,12 @@ public class AnsibleActivator implements BundleActivator { /** * The logger to be used */ - private static final EELFLogger logger = EELFManager.getInstance().getLogger(AnsibleActivator.class); + private final EELFLogger logger = EELFManager.getInstance().getLogger(AnsibleActivator.class); /** * The configuration object used to configure this bundle */ - private Configuration configuration; + private final Configuration configuration = ConfigurationFactory.getConfiguration(); /** * Called when this bundle is started so the Framework can perform the bundle-specific activities necessary to start @@ -70,26 +67,24 @@ public class AnsibleActivator implements BundleActivator { * <p> * This method must complete and return to its caller in a timely manner. * </p> - * - * @param context - * The execution context of the bundle being started. - * @throws java.lang.Exception - * If this method throws an exception, this bundle is marked as stopped and the Framework will remove - * this bundle's listeners, unregister all services registered by this bundle, and release all services - * used by this bundle. + * + * @param context The execution context of the bundle being started. + * @throws java.lang.Exception If this method throws an exception, this bundle is marked as stopped and the + * Framework will remove this bundle's listeners, unregister all services registered + * by this bundle, and release all services used by this bundle. * @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext) */ @Override public void start(BundleContext context) throws Exception { logger.info("Starting bundle " + getName()); - String appName = "APPC: "; + String appName = "APPC: "; logger.info(Msg.COMPONENT_INITIALIZING, appName, "Ansible Adapter"); - adapter = new AnsibleAdapterImpl(); - + adapter = new AnsibleAdapterImpl(); + if (registration == null) { logger.info(Msg.REGISTERING_SERVICE, appName, adapter.getAdapterName(), - AnsibleAdapter.class.getSimpleName()); + AnsibleAdapter.class.getSimpleName()); registration = context.registerService(AnsibleAdapter.class, adapter, null); } @@ -104,13 +99,11 @@ public class AnsibleActivator implements BundleActivator { * <p> * This method must complete and return to its caller in a timely manner. * </p> - * - * @param context - * The execution context of the bundle being stopped. - * @throws java.lang.Exception - * If this method throws an exception, the bundle is still marked as stopped, and the Framework will - * remove the bundle's listeners, unregister all services registered by the bundle, and release all - * services used by the bundle. * + * + * @param context The execution context of the bundle being stopped. + * @throws java.lang.Exception If this method throws an exception, the bundle is still marked as stopped, and the + * Framework will remove the bundle's listeners, unregister all services registered + * by the bundle, and release all services used by the bundle. * @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext) */ @Override @@ -130,5 +123,4 @@ public class AnsibleActivator implements BundleActivator { public String getName() { return "APPC Ansible Adapter"; } - } diff --git a/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/java/org/openecomp/appc/executor/impl/CommandExecutorImpl.java b/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/java/org/openecomp/appc/executor/impl/CommandExecutorImpl.java index 5054d34c7..f7ffdadce 100644 --- a/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/java/org/openecomp/appc/executor/impl/CommandExecutorImpl.java +++ b/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/java/org/openecomp/appc/executor/impl/CommandExecutorImpl.java @@ -28,45 +28,50 @@ package org.openecomp.appc.executor.impl; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.concurrent.TimeUnit; - +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import org.apache.commons.lang.ObjectUtils; import org.openecomp.appc.domainmodel.lcm.RuntimeContext; -import org.openecomp.appc.domainmodel.lcm.ActionLevel; import org.openecomp.appc.exceptions.APPCException; import org.openecomp.appc.executionqueue.ExecutionQueueService; -import org.openecomp.appc.executionqueue.impl.ExecutionQueueServiceFactory; import org.openecomp.appc.executor.CommandExecutor; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.TimeUnit; public class CommandExecutorImpl implements CommandExecutor { - private CommandTaskFactory executionTaskFactory ; + private CommandTaskFactory executionTaskFactory; private static final EELFLogger logger = EELFManager.getInstance().getLogger(CommandExecutorImpl.class); private ExecutionQueueService executionQueueService; private ExpiredMessageHandler expiredMessageHandler; - public CommandExecutorImpl(){ + public CommandExecutorImpl() { } + /** + * Injected by blueprint + * + * @param executionQueueService + */ public void setExecutionQueueService(ExecutionQueueService executionQueueService) { this.executionQueueService = executionQueueService; } + /** + * Injected by blueprint + * @param expiredMessageHandler + */ public void setExpiredMessageHandler(ExpiredMessageHandler expiredMessageHandler) { this.expiredMessageHandler = expiredMessageHandler; } public void initialize() { logger.info("initialization started of CommandExecutorImpl"); - executionQueueService = ExecutionQueueServiceFactory.getExecutionQueueService(); executionQueueService.registerMessageExpirationListener(expiredMessageHandler); } @@ -77,13 +82,14 @@ public class CommandExecutorImpl implements CommandExecutor { /** * Execute given command * Create command request and enqueue it for execution. + * * @param commandExecutorInput Contains CommandHeader, command , target Id , payload and conf ID (optional) * @throws APPCException in case of error. */ @Override - public void executeCommand (RuntimeContext commandExecutorInput) throws APPCException{ + public void executeCommand(RuntimeContext commandExecutorInput) throws APPCException { if (logger.isTraceEnabled()) { - logger.trace("Entering to executeCommand with CommandExecutorInput = "+ ObjectUtils.toString(commandExecutorInput)); + logger.trace("Entering to executeCommand with CommandExecutorInput = " + ObjectUtils.toString(commandExecutorInput)); } enqueRequest(commandExecutorInput); if (logger.isTraceEnabled()) { @@ -91,30 +97,31 @@ public class CommandExecutorImpl implements CommandExecutor { } } - private RuntimeContext getCommandRequest(RuntimeContext commandExecutorInput){ + private RuntimeContext getCommandRequest(RuntimeContext commandExecutorInput) { if (logger.isTraceEnabled()) { - logger.trace("Entering to getCommandRequest with CommandExecutorInput = "+ ObjectUtils.toString(commandExecutorInput)); + logger.trace("Entering to getCommandRequest with CommandExecutorInput = " + ObjectUtils.toString(commandExecutorInput)); } RuntimeContext commandRequest; commandRequest = commandExecutorInput; if (logger.isTraceEnabled()) { - logger.trace("Exiting from getCommandRequest with (CommandRequest = "+ ObjectUtils.toString(commandRequest)+")"); + logger.trace("Exiting from getCommandRequest with (CommandRequest = " + ObjectUtils.toString(commandRequest) + ")"); } return commandRequest; } @SuppressWarnings("unchecked") - private void enqueRequest(RuntimeContext request) throws APPCException{ + private void enqueRequest(RuntimeContext request) throws APPCException { if (logger.isTraceEnabled()) { - logger.trace("Entering to enqueRequest with CommandRequest = "+ ObjectUtils.toString(request)); + logger.trace("Entering to enqueRequest with CommandRequest = " + ObjectUtils.toString(request)); } try { - String action = request.getRequestContext().getAction().name(); CommandTask commandTask = executionTaskFactory.getExecutionTask(request); + long remainingTTL = getRemainingTTL(request); - executionQueueService.putMessage(commandTask,remainingTTL, TimeUnit.MILLISECONDS); + + executionQueueService.putMessage(commandTask, remainingTTL, TimeUnit.MILLISECONDS); } catch (Exception e) { - logger.error("Exception: "+e.getMessage()); + logger.error("Exception: " + e.getMessage()); throw new APPCException(e); } @@ -129,9 +136,9 @@ public class CommandExecutorImpl implements CommandExecutor { return ChronoUnit.MILLIS.between(Instant.now(), requestTimestamp.plusSeconds(ttl)); } - private CommandTask getMessageExecutor(RuntimeContext request){ + private CommandTask getMessageExecutor(RuntimeContext request) { if (logger.isTraceEnabled()) { - logger.trace("Entering to getMessageExecutor with command = "+ request); + logger.trace("Entering to getMessageExecutor with command = " + request); } CommandTask executionTask = executionTaskFactory.getExecutionTask(request); if (logger.isTraceEnabled()) { @@ -139,6 +146,4 @@ public class CommandExecutorImpl implements CommandExecutor { } return executionTask; } - - } diff --git a/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index f84e97207..5474dcc6f 100644 --- a/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -31,27 +31,36 @@ xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd"> <bean id="commandExecutorBean" class="org.openecomp.appc.executor.impl.CommandExecutorImpl" scope="singleton" init-method="initialize"> + <property name="executionQueueService" ref="ExecutionQueueServiceRef"/> <property name="executionTaskFactory" ref="CommandExecutionTaskFactory"/> <property name="expiredMessageHandler" ref="expiredMessageHandlerBean"/> </bean> - <bean id="CommandExecutionTaskFactory" class="org.openecomp.appc.executor.impl.CommandTaskFactory" scope="singleton" > - <property name="vnfRequestHandler" ref="vnfRequestHandlerService" /> + <bean id="CommandExecutionTaskFactory" class="org.openecomp.appc.executor.impl.CommandTaskFactory" + scope="singleton"> + <property name="vnfRequestHandler" ref="vnfRequestHandlerService"/> <property name="vmRequestHandler" ref="vmRequestHandlerService"/> - <property name="workflowManager" ref="WorkFlowManagerRef" /> - <property name="lifecyclemanager" ref="LifecyclemanagerRef" /> + <property name="workflowManager" ref="WorkFlowManagerRef"/> + <property name="lifecyclemanager" ref="LifecyclemanagerRef"/> </bean> <bean id="expiredMessageHandlerBean" class="org.openecomp.appc.executor.impl.ExpiredMessageHandler" scope="singleton"> - <property name="vnfRequestHandler" ref="vnfRequestHandlerService" /> + <property name="vnfRequestHandler" ref="vnfRequestHandlerService"/> <property name="vmRequestHandler" ref="vmRequestHandlerService"/> </bean> - <reference id="WorkFlowManagerRef" availability="mandatory" activation="eager" interface="org.openecomp.appc.workflow.WorkFlowManager" /> - <reference id="vnfRequestHandlerService" availability="optional" activation="eager" interface="org.openecomp.appc.requesthandler.RequestHandler" filter="(level=VNF)" /> - <reference id="vmRequestHandlerService" availability="optional" activation="eager" interface="org.openecomp.appc.requesthandler.RequestHandler" filter="(level=VM)" /> - <reference id="LifecyclemanagerRef" availability="mandatory" activation="eager" interface="org.openecomp.appc.lifecyclemanager.LifecycleManager" /> + <reference id="WorkFlowManagerRef" availability="mandatory" activation="eager" + interface="org.openecomp.appc.workflow.WorkFlowManager"/> + <reference id="vnfRequestHandlerService" availability="optional" activation="eager" + interface="org.openecomp.appc.requesthandler.RequestHandler" filter="(level=VNF)"/> + <reference id="vmRequestHandlerService" availability="optional" activation="eager" + interface="org.openecomp.appc.requesthandler.RequestHandler" filter="(level=VM)"/> + <reference id="LifecyclemanagerRef" availability="mandatory" activation="eager" + interface="org.openecomp.appc.lifecyclemanager.LifecycleManager"/> + <reference id="ExecutionQueueServiceRef" availability="mandatory" activation="eager" + interface="org.openecomp.appc.executionqueue.ExecutionQueueService"/> - <service id="commandExecutorService" interface="org.openecomp.appc.executor.CommandExecutor" ref="commandExecutorBean"/> + <service id="commandExecutorService" interface="org.openecomp.appc.executor.CommandExecutor" + ref="commandExecutorBean"/> </blueprint> diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java index 4f97a97f9..8670adabd 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java @@ -24,49 +24,76 @@ package org.openecomp.appc.executionqueue.helper; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; - +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import org.openecomp.appc.configuration.Configuration; import org.openecomp.appc.configuration.ConfigurationFactory; +import org.openecomp.appc.executionqueue.impl.QueueManager; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; public class Util { - private static final Configuration configuration = ConfigurationFactory.getConfiguration(); + private final EELFLogger logger = EELFManager.getInstance().getLogger(Util.class); + private final int default_queue_size = 10; + private final int default_threadpool_size = 10; + private final String queue_size_key = "appc.dispatcher.executionqueue.backlog.size"; + private final String threadpool_size_key = "appc.dispatcher.executionqueue.threadpool.size"; + + private Configuration configuration; - public static int DEFAULT_QUEUE_SIZE = 10; - public static int DEFAULT_THREADPOOL_SIZE = 10; + /** + * Initialization. + * <p>Used by blueprint. + */ + public void init() { - public static int getExecutionQueSize(){ - String sizeStr = configuration.getProperty("appc.dispatcher.executionqueue.backlog.size", String.valueOf(DEFAULT_QUEUE_SIZE)); - int size = DEFAULT_QUEUE_SIZE; - try{ + configuration = ConfigurationFactory.getConfiguration(); + } + + public int getExecutionQueueSize() { + String sizeStr = configuration.getProperty(queue_size_key, String.valueOf(default_queue_size)); + + int size = default_queue_size; + try { size = Integer.parseInt(sizeStr); + } catch (NumberFormatException e) { + logger.error("Error while parse key:" + queue_size_key + " got from configuration " + e.getMessage(), e); } - catch (NumberFormatException e){ - } return size; } - public static int getThreadPoolSize(){ - String sizeStr = configuration.getProperty("appc.dispatcher.executionqueue.threadpool.size", String.valueOf(DEFAULT_THREADPOOL_SIZE)); - int size = DEFAULT_THREADPOOL_SIZE; - try{ + public int getThreadPoolSize() { + String sizeStr = configuration.getProperty(threadpool_size_key, String.valueOf(default_threadpool_size)); + + int size = default_threadpool_size; + try { size = Integer.parseInt(sizeStr); + } catch (NumberFormatException e) { + logger.error("Error while parse key:" + threadpool_size_key + " got from configuration " + + e.getMessage(), e); } - catch (NumberFormatException e){ - } return size; } - public static ThreadFactory getThreadFactory(final boolean isDaemon){ + public ThreadFactory getThreadFactory(final boolean isDaemon, final String threadNamePrefix) { return new ThreadFactory() { - final ThreadFactory factory = Executors.defaultThreadFactory(); + private final String THREAD_NAME_PATTERN = "%s-%d"; + private final ThreadFactory factory = Executors.defaultThreadFactory(); + private final AtomicInteger counter = new AtomicInteger(); + public Thread newThread(Runnable r) { Thread t = factory.newThread(r); t.setDaemon(isDaemon); + if (threadNamePrefix != null && !threadNamePrefix.isEmpty()) { + final String threadName = String.format(THREAD_NAME_PATTERN, threadNamePrefix, counter + .incrementAndGet()); + t.setName(threadName); + } return t; } }; diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java index 3092bd881..c29078c27 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java @@ -24,51 +24,54 @@ package org.openecomp.appc.executionqueue.impl; -import java.time.Instant; -import java.util.Calendar; -import java.util.Date; -import java.util.concurrent.TimeUnit; - +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import org.openecomp.appc.exceptions.APPCException; import org.openecomp.appc.executionqueue.ExecutionQueueService; import org.openecomp.appc.executionqueue.MessageExpirationListener; import org.openecomp.appc.executionqueue.impl.object.QueueMessage; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; +import java.time.Instant; +import java.util.concurrent.TimeUnit; public class ExecutionQueueServiceImpl<M extends Runnable> implements ExecutionQueueService<M> { private static final EELFLogger logger = - EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class); + EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class); - ExecutionQueueServiceImpl(){ + private QueueManager queueManager; + + public ExecutionQueueServiceImpl() { + //do nothing + } + /** + * Injected by blueprint + * + * @param queueManager queue manager to be set + */ + public void setQueueManager(QueueManager queueManager) { + this.queueManager = queueManager; } @Override public void putMessage(M message) throws APPCException { - this.putMessage(message,-1,null); + this.putMessage(message, -1, null); } @Override - public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException{ - try { - Instant expirationTime = calculateExpirationTime(timeout,unit); - QueueManager queueManager = QueueManager.getInstance(); - boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<M>(message,expirationTime)); - if(!enqueueTask){ - throw new APPCException("failed to put message in queue"); - } - } catch (Exception e) { - logger.error("Error in putMessage method of ExecutionQueueServiceImpl" + e.getMessage()); - throw new APPCException(e); + public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException { + Instant expirationTime = calculateExpirationTime(timeout, unit); + boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<>(message, expirationTime)); + if (!enqueueTask) { + logger.error("Error in putMessage method of ExecutionQueueServiceImpl"); + throw new APPCException("Failed to put message in queue"); } } @Override public void registerMessageExpirationListener(MessageExpirationListener listener) { - QueueManager.getInstance().setListener(listener); + queueManager.setListener(listener); } private Instant calculateExpirationTime(long timeToLive, TimeUnit unit) { diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java index 11d0b8d69..b78f399e0 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java @@ -24,82 +24,87 @@ package org.openecomp.appc.executionqueue.impl; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; - +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import org.openecomp.appc.executionqueue.MessageExpirationListener; import org.openecomp.appc.executionqueue.helper.Util; import org.openecomp.appc.executionqueue.impl.object.QueueMessage; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class QueueManager { - private LinkedBlockingQueue<QueueMessage<? extends Runnable>> queue; + private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class); private MessageExpirationListener listener; - - private static int MAX_QUEUE_SIZE = Util.getExecutionQueSize(); - - private static int MAX_THREAD_SIZE = Util.getThreadPoolSize(); - private ExecutorService messageExecutor; + private int max_thread_size; + private int max_queue_size; + private Util executionQueueUtil; - private static final EELFLogger logger = - EELFManager.getInstance().getLogger(QueueManager.class); + public QueueManager() { + //do nothing + } - private QueueManager(){ - init(); + /** + * Initialization method used by blueprint + */ + public void init() { + max_thread_size = executionQueueUtil.getThreadPoolSize(); + max_queue_size = executionQueueUtil.getExecutionQueueSize(); + messageExecutor = new ThreadPoolExecutor( + max_thread_size, + max_thread_size, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(max_queue_size), + executionQueueUtil.getThreadFactory(true, "appc-dispatcher"), + new ThreadPoolExecutor.AbortPolicy()); } - private static class QueueManagerHolder { - private static final QueueManager INSTANCE = new QueueManager(); + /** + * Destory method used by blueprint + */ + public void stop() { + messageExecutor.shutdownNow(); } - public static QueueManager getInstance() { - return QueueManagerHolder.INSTANCE; + public void setListener(MessageExpirationListener listener) { + this.listener = listener; } - private void init(){ - queue = new LinkedBlockingQueue<QueueMessage<? extends Runnable>>(MAX_QUEUE_SIZE); - messageExecutor = Executors.newFixedThreadPool(MAX_THREAD_SIZE,Util.getThreadFactory(true)); + /** + * Injected by blueprint + * + * @param executionQueueUtil Util to be set + */ + public void setExecutionQueueUtil(Util executionQueueUtil) { + this.executionQueueUtil = executionQueueUtil; + } - for(int i=0;i<MAX_THREAD_SIZE;i++){ - messageExecutor.submit(new Runnable() { - @Override - public void run() { - while (true){ - try{ - QueueMessage<? extends Runnable> queueMessage = queue.take(); - if (queueMessage.isExpired()) { - logger.debug("Message expired "+ queueMessage.getMessage()); - if(listener != null){ - listener.onMessageExpiration(queueMessage.getMessage()); - } - else{ - logger.warn("Listener not available for expired message "); - } - } - else{ - queueMessage.getMessage().run(); - } - } catch (Exception e) { - logger.error("Error in startMessagePolling method of ExecutionQueueServiceImpl" + e.getMessage()); - } + public boolean enqueueTask(QueueMessage queueMessage) { + boolean isEnqueued = true; + try { + messageExecutor.execute(() -> { + if (queueMessage.isExpired()) { + logger.debug("Message expired " + queueMessage.getMessage()); + if (listener != null) { + listener.onMessageExpiration(queueMessage.getMessage()); + } else { + logger.warn("Listener not available for expired message "); } + } else { + queueMessage.getMessage().run(); } }); + } catch (RejectedExecutionException ree) { + isEnqueued = false; } - } - public void setListener(MessageExpirationListener listener) { - this.listener = listener; + return isEnqueued; } - - public boolean enqueueTask(QueueMessage<? extends Runnable> queueMessage) { - return queue.offer(queueMessage); - } - } diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/ExecutionQueueServiceTest.java index 6e9584894..067b6c3e7 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/ExecutionQueueServiceTest.java @@ -25,50 +25,43 @@ package org.openecomp.appc.executionqueue; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; import org.mockito.Mockito; +import org.mockito.Spy; import org.openecomp.appc.exceptions.APPCException; -import org.openecomp.appc.executionqueue.ExecutionQueueService; -import org.openecomp.appc.executionqueue.impl.ExecutionQueueServiceFactory; -import org.powermock.api.mockito.PowerMockito; +import org.openecomp.appc.executionqueue.helper.Util; +import org.openecomp.appc.executionqueue.impl.ExecutionQueueServiceImpl; +import org.openecomp.appc.executionqueue.impl.QueueManager; +import org.powermock.modules.junit4.PowerMockRunner; -import java.util.concurrent.TimeUnit; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; +@RunWith(PowerMockRunner.class) +public class ExecutionQueueServiceTest { -public class TestExecutionQueueService { + @InjectMocks + private ExecutionQueueServiceImpl service; + @Spy + private QueueManager queueManager = new QueueManager(); + @Spy + private Util executionQueueUtil = new Util(); - @Test - public void testPositiveFlow(){ - Message message = new Message(); - ExecutionQueueService service = ExecutionQueueServiceFactory.getExecutionQueueService(); - try { - service.putMessage(message); - waitFor(5000); - Assert.assertTrue(message.isRunExecuted()); - } catch (APPCException e) { - Assert.fail(e.toString()); - } + @Before + public void setup() { + Mockito.doReturn(true).when(queueManager).enqueueTask(any()); } -// @Test - public void testTimeout(){ - ExecutionQueueService service = ExecutionQueueServiceFactory.getExecutionQueueService(); + @Test + public void testPositiveFlow() { Message message = new Message(); - Listener listener = new Listener(); - service.registerMessageExpirationListener(listener); try { - service.putMessage(message,1, TimeUnit.MILLISECONDS); - waitFor(5000); - Assert.assertTrue(listener.isListenerExecuted()); + service.putMessage(message); + Mockito.verify(queueManager, times(1)).enqueueTask(any()); } catch (APPCException e) { - e.printStackTrace(); - } - } - - private void waitFor(long milliSeconds){ - try { - Thread.sleep(milliSeconds); - } catch (InterruptedException e) { Assert.fail(e.toString()); } } |