From 7fec0d41c1fdfdb0eef04c6df8fb4b5368a975a5 Mon Sep 17 00:00:00 2001 From: Hao Kuang Date: Fri, 18 Aug 2017 16:07:56 +0000 Subject: Fix ExecutionQueue cannot be cleaned Fix that LCM Requests in Execution Queue continue after Stop OAM request sent. Fix that Appc-Ansible bundle cannot be fully stopped because of NPE. Issue-Id: APPC-159 Change-Id: I8f0a3a79a5c572ad84e66f71b4ddb47118704302 Signed-off-by: Hao Kuang --- .../openecomp/appc/executionqueue/helper/Util.java | 67 +++++++++---- .../impl/ExecutionQueueServiceImpl.java | 47 +++++---- .../appc/executionqueue/impl/QueueManager.java | 111 +++++++++++---------- .../executionqueue/ExecutionQueueServiceTest.java | 68 +++++++++++++ .../executionqueue/TestExecutionQueueService.java | 75 -------------- 5 files changed, 198 insertions(+), 170 deletions(-) create mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/ExecutionQueueServiceTest.java delete mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java (limited to 'appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src') diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java index 4f97a97f9..8670adabd 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java @@ -24,49 +24,76 @@ package org.openecomp.appc.executionqueue.helper; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; - +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import org.openecomp.appc.configuration.Configuration; import org.openecomp.appc.configuration.ConfigurationFactory; +import org.openecomp.appc.executionqueue.impl.QueueManager; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; public class Util { - private static final Configuration configuration = ConfigurationFactory.getConfiguration(); + private final EELFLogger logger = EELFManager.getInstance().getLogger(Util.class); + private final int default_queue_size = 10; + private final int default_threadpool_size = 10; + private final String queue_size_key = "appc.dispatcher.executionqueue.backlog.size"; + private final String threadpool_size_key = "appc.dispatcher.executionqueue.threadpool.size"; + + private Configuration configuration; - public static int DEFAULT_QUEUE_SIZE = 10; - public static int DEFAULT_THREADPOOL_SIZE = 10; + /** + * Initialization. + *

Used by blueprint. + */ + public void init() { - public static int getExecutionQueSize(){ - String sizeStr = configuration.getProperty("appc.dispatcher.executionqueue.backlog.size", String.valueOf(DEFAULT_QUEUE_SIZE)); - int size = DEFAULT_QUEUE_SIZE; - try{ + configuration = ConfigurationFactory.getConfiguration(); + } + + public int getExecutionQueueSize() { + String sizeStr = configuration.getProperty(queue_size_key, String.valueOf(default_queue_size)); + + int size = default_queue_size; + try { size = Integer.parseInt(sizeStr); + } catch (NumberFormatException e) { + logger.error("Error while parse key:" + queue_size_key + " got from configuration " + e.getMessage(), e); } - catch (NumberFormatException e){ - } return size; } - public static int getThreadPoolSize(){ - String sizeStr = configuration.getProperty("appc.dispatcher.executionqueue.threadpool.size", String.valueOf(DEFAULT_THREADPOOL_SIZE)); - int size = DEFAULT_THREADPOOL_SIZE; - try{ + public int getThreadPoolSize() { + String sizeStr = configuration.getProperty(threadpool_size_key, String.valueOf(default_threadpool_size)); + + int size = default_threadpool_size; + try { size = Integer.parseInt(sizeStr); + } catch (NumberFormatException e) { + logger.error("Error while parse key:" + threadpool_size_key + " got from configuration " + + e.getMessage(), e); } - catch (NumberFormatException e){ - } return size; } - public static ThreadFactory getThreadFactory(final boolean isDaemon){ + public ThreadFactory getThreadFactory(final boolean isDaemon, final String threadNamePrefix) { return new ThreadFactory() { - final ThreadFactory factory = Executors.defaultThreadFactory(); + private final String THREAD_NAME_PATTERN = "%s-%d"; + private final ThreadFactory factory = Executors.defaultThreadFactory(); + private final AtomicInteger counter = new AtomicInteger(); + public Thread newThread(Runnable r) { Thread t = factory.newThread(r); t.setDaemon(isDaemon); + if (threadNamePrefix != null && !threadNamePrefix.isEmpty()) { + final String threadName = String.format(THREAD_NAME_PATTERN, threadNamePrefix, counter + .incrementAndGet()); + t.setName(threadName); + } return t; } }; diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java index 3092bd881..c29078c27 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java @@ -24,51 +24,54 @@ package org.openecomp.appc.executionqueue.impl; -import java.time.Instant; -import java.util.Calendar; -import java.util.Date; -import java.util.concurrent.TimeUnit; - +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import org.openecomp.appc.exceptions.APPCException; import org.openecomp.appc.executionqueue.ExecutionQueueService; import org.openecomp.appc.executionqueue.MessageExpirationListener; import org.openecomp.appc.executionqueue.impl.object.QueueMessage; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; +import java.time.Instant; +import java.util.concurrent.TimeUnit; public class ExecutionQueueServiceImpl implements ExecutionQueueService { private static final EELFLogger logger = - EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class); + EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class); - ExecutionQueueServiceImpl(){ + private QueueManager queueManager; + + public ExecutionQueueServiceImpl() { + //do nothing + } + /** + * Injected by blueprint + * + * @param queueManager queue manager to be set + */ + public void setQueueManager(QueueManager queueManager) { + this.queueManager = queueManager; } @Override public void putMessage(M message) throws APPCException { - this.putMessage(message,-1,null); + this.putMessage(message, -1, null); } @Override - public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException{ - try { - Instant expirationTime = calculateExpirationTime(timeout,unit); - QueueManager queueManager = QueueManager.getInstance(); - boolean enqueueTask = queueManager.enqueueTask(new QueueMessage(message,expirationTime)); - if(!enqueueTask){ - throw new APPCException("failed to put message in queue"); - } - } catch (Exception e) { - logger.error("Error in putMessage method of ExecutionQueueServiceImpl" + e.getMessage()); - throw new APPCException(e); + public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException { + Instant expirationTime = calculateExpirationTime(timeout, unit); + boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<>(message, expirationTime)); + if (!enqueueTask) { + logger.error("Error in putMessage method of ExecutionQueueServiceImpl"); + throw new APPCException("Failed to put message in queue"); } } @Override public void registerMessageExpirationListener(MessageExpirationListener listener) { - QueueManager.getInstance().setListener(listener); + queueManager.setListener(listener); } private Instant calculateExpirationTime(long timeToLive, TimeUnit unit) { diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java index 11d0b8d69..b78f399e0 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java @@ -24,82 +24,87 @@ package org.openecomp.appc.executionqueue.impl; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; - +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import org.openecomp.appc.executionqueue.MessageExpirationListener; import org.openecomp.appc.executionqueue.helper.Util; import org.openecomp.appc.executionqueue.impl.object.QueueMessage; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class QueueManager { - private LinkedBlockingQueue> queue; + private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class); private MessageExpirationListener listener; - - private static int MAX_QUEUE_SIZE = Util.getExecutionQueSize(); - - private static int MAX_THREAD_SIZE = Util.getThreadPoolSize(); - private ExecutorService messageExecutor; + private int max_thread_size; + private int max_queue_size; + private Util executionQueueUtil; - private static final EELFLogger logger = - EELFManager.getInstance().getLogger(QueueManager.class); + public QueueManager() { + //do nothing + } - private QueueManager(){ - init(); + /** + * Initialization method used by blueprint + */ + public void init() { + max_thread_size = executionQueueUtil.getThreadPoolSize(); + max_queue_size = executionQueueUtil.getExecutionQueueSize(); + messageExecutor = new ThreadPoolExecutor( + max_thread_size, + max_thread_size, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(max_queue_size), + executionQueueUtil.getThreadFactory(true, "appc-dispatcher"), + new ThreadPoolExecutor.AbortPolicy()); } - private static class QueueManagerHolder { - private static final QueueManager INSTANCE = new QueueManager(); + /** + * Destory method used by blueprint + */ + public void stop() { + messageExecutor.shutdownNow(); } - public static QueueManager getInstance() { - return QueueManagerHolder.INSTANCE; + public void setListener(MessageExpirationListener listener) { + this.listener = listener; } - private void init(){ - queue = new LinkedBlockingQueue>(MAX_QUEUE_SIZE); - messageExecutor = Executors.newFixedThreadPool(MAX_THREAD_SIZE,Util.getThreadFactory(true)); + /** + * Injected by blueprint + * + * @param executionQueueUtil Util to be set + */ + public void setExecutionQueueUtil(Util executionQueueUtil) { + this.executionQueueUtil = executionQueueUtil; + } - for(int i=0;i queueMessage = queue.take(); - if (queueMessage.isExpired()) { - logger.debug("Message expired "+ queueMessage.getMessage()); - if(listener != null){ - listener.onMessageExpiration(queueMessage.getMessage()); - } - else{ - logger.warn("Listener not available for expired message "); - } - } - else{ - queueMessage.getMessage().run(); - } - } catch (Exception e) { - logger.error("Error in startMessagePolling method of ExecutionQueueServiceImpl" + e.getMessage()); - } + public boolean enqueueTask(QueueMessage queueMessage) { + boolean isEnqueued = true; + try { + messageExecutor.execute(() -> { + if (queueMessage.isExpired()) { + logger.debug("Message expired " + queueMessage.getMessage()); + if (listener != null) { + listener.onMessageExpiration(queueMessage.getMessage()); + } else { + logger.warn("Listener not available for expired message "); } + } else { + queueMessage.getMessage().run(); } }); + } catch (RejectedExecutionException ree) { + isEnqueued = false; } - } - public void setListener(MessageExpirationListener listener) { - this.listener = listener; + return isEnqueued; } - - public boolean enqueueTask(QueueMessage queueMessage) { - return queue.offer(queueMessage); - } - } diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/ExecutionQueueServiceTest.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/ExecutionQueueServiceTest.java new file mode 100644 index 000000000..067b6c3e7 --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/ExecutionQueueServiceTest.java @@ -0,0 +1,68 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.appc.executionqueue; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mockito; +import org.mockito.Spy; +import org.openecomp.appc.exceptions.APPCException; +import org.openecomp.appc.executionqueue.helper.Util; +import org.openecomp.appc.executionqueue.impl.ExecutionQueueServiceImpl; +import org.openecomp.appc.executionqueue.impl.QueueManager; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; + +@RunWith(PowerMockRunner.class) +public class ExecutionQueueServiceTest { + + @InjectMocks + private ExecutionQueueServiceImpl service; + @Spy + private QueueManager queueManager = new QueueManager(); + @Spy + private Util executionQueueUtil = new Util(); + + @Before + public void setup() { + Mockito.doReturn(true).when(queueManager).enqueueTask(any()); + } + + @Test + public void testPositiveFlow() { + Message message = new Message(); + try { + service.putMessage(message); + Mockito.verify(queueManager, times(1)).enqueueTask(any()); + } catch (APPCException e) { + Assert.fail(e.toString()); + } + } +} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java deleted file mode 100644 index 6e9584894..000000000 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java +++ /dev/null @@ -1,75 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.appc.executionqueue; - -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; -import org.openecomp.appc.exceptions.APPCException; -import org.openecomp.appc.executionqueue.ExecutionQueueService; -import org.openecomp.appc.executionqueue.impl.ExecutionQueueServiceFactory; -import org.powermock.api.mockito.PowerMockito; - -import java.util.concurrent.TimeUnit; - - -public class TestExecutionQueueService { - - @Test - public void testPositiveFlow(){ - Message message = new Message(); - ExecutionQueueService service = ExecutionQueueServiceFactory.getExecutionQueueService(); - try { - service.putMessage(message); - waitFor(5000); - Assert.assertTrue(message.isRunExecuted()); - } catch (APPCException e) { - Assert.fail(e.toString()); - } - } - -// @Test - public void testTimeout(){ - ExecutionQueueService service = ExecutionQueueServiceFactory.getExecutionQueueService(); - Message message = new Message(); - Listener listener = new Listener(); - service.registerMessageExpirationListener(listener); - try { - service.putMessage(message,1, TimeUnit.MILLISECONDS); - waitFor(5000); - Assert.assertTrue(listener.isListenerExecuted()); - } catch (APPCException e) { - e.printStackTrace(); - } - } - - private void waitFor(long milliSeconds){ - try { - Thread.sleep(milliSeconds); - } catch (InterruptedException e) { - Assert.fail(e.toString()); - } - } -} -- cgit 1.2.3-korg