diff options
Diffstat (limited to 'appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src')
8 files changed, 60 insertions, 155 deletions
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java index 1423962ef..2c4aa0853 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java @@ -31,5 +31,4 @@ import org.onap.appc.exceptions.APPCException; public interface ExecutionQueueService<M extends Runnable> { void putMessage(M message) throws APPCException; void putMessage(M message, long timeout, TimeUnit unit) throws APPCException; - void registerMessageExpirationListener(MessageExpirationListener listener); } diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java index 164a8b563..09d49deef 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java @@ -24,8 +24,6 @@ package org.onap.appc.executionqueue.helper; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; import org.onap.appc.configuration.Configuration; import org.onap.appc.configuration.ConfigurationFactory; @@ -35,11 +33,10 @@ import java.util.concurrent.atomic.AtomicInteger; public class Util { - private final EELFLogger logger = EELFManager.getInstance().getLogger(Util.class); - private final int default_queue_size = 10; - private final int default_threadpool_size = 10; - private final String queue_size_key = "appc.dispatcher.executionqueue.backlog.size"; - private final String threadpool_size_key = "appc.dispatcher.executionqueue.threadpool.size"; + private int default_queue_size = 10; + private int default_threadpool_size = 10; + private String queue_size_key = "appc.dispatcher.executionqueue.backlog.size"; + private String threadpool_size_key = "appc.dispatcher.executionqueue.threadpool.size"; private Configuration configuration; @@ -48,7 +45,6 @@ public class Util { * <p>Used by blueprint. */ public void init() { - configuration = ConfigurationFactory.getConfiguration(); } @@ -59,7 +55,7 @@ public class Util { try { size = Integer.parseInt(sizeStr); } catch (NumberFormatException e) { - logger.error("Error while parse key:" + queue_size_key + " got from configuration " + e.getMessage(), e); + } return size; @@ -72,8 +68,7 @@ public class Util { try { size = Integer.parseInt(sizeStr); } catch (NumberFormatException e) { - logger.error("Error while parse key:" + threadpool_size_key + " got from configuration " - + e.getMessage(), e); + } return size; @@ -89,8 +84,7 @@ public class Util { Thread t = factory.newThread(r); t.setDaemon(isDaemon); if (threadNamePrefix != null && !threadNamePrefix.isEmpty()) { - final String threadName = String.format(THREAD_NAME_PATTERN, threadNamePrefix, counter - .incrementAndGet()); + final String threadName = String.format(THREAD_NAME_PATTERN, threadNamePrefix, counter.incrementAndGet()); t.setName(threadName); } return t; diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java deleted file mode 100644 index f071be6f0..000000000 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.executionqueue.impl; - -import org.onap.appc.executionqueue.ExecutionQueueService; - -public class ExecutionQueueServiceFactory { - - private static class ExecutionQueueServiceHolder { - public static final ExecutionQueueService executionQueueService = new ExecutionQueueServiceImpl(); - } - - public static ExecutionQueueService getExecutionQueueService() { - return ExecutionQueueServiceHolder.executionQueueService; - } -} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java index 0634a0eb2..027cc9d4b 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java @@ -31,58 +31,59 @@ import org.onap.appc.executionqueue.ExecutionQueueService; import org.onap.appc.executionqueue.MessageExpirationListener; import org.onap.appc.executionqueue.impl.object.QueueMessage; -import java.time.Instant; +import java.util.Calendar; +import java.util.Date; import java.util.concurrent.TimeUnit; public class ExecutionQueueServiceImpl<M extends Runnable> implements ExecutionQueueService<M> { - private static final EELFLogger logger = - EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class); + private final EELFLogger logger = EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class); private QueueManager queueManager; - public ExecutionQueueServiceImpl() { - //do nothing + public ExecutionQueueServiceImpl(){ + + } + + @Override + public void putMessage(M message) throws APPCException { + this.putMessage(message,-1,null); } /** * Injected by blueprint - * - * @param queueManager queue manager to be set + * @param queueManager */ public void setQueueManager(QueueManager queueManager) { this.queueManager = queueManager; } @Override - public void putMessage(M message) throws APPCException { - this.putMessage(message, -1, null); - } + public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException{ + QueueMessage queueMessage; - @Override - public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException { - Instant expirationTime = calculateExpirationTime(timeout, unit); - boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<>(message, expirationTime)); - if (!enqueueTask) { - logger.error("Error in putMessage method of ExecutionQueueServiceImpl"); - throw new APPCException("Failed to put message in queue"); + try { + Date expirationTime = calculateExpirationTime(timeout,unit); + queueMessage = new QueueMessage(message,expirationTime); + boolean enqueueTask = queueManager.enqueueTask(queueMessage); + if(!enqueueTask){ + throw new APPCException("failed to put message in queue"); + } + } catch (Exception e) { + logger.error("Error in putMessage method of ExecutionQueueServiceImpl" + e.getMessage()); + throw new APPCException(e); } } - @Override - public void registerMessageExpirationListener(MessageExpirationListener listener) { - queueManager.setListener(listener); - } - - private Instant calculateExpirationTime(long timeToLive, TimeUnit unit) { - if (timeToLive > 0 && unit != null) { - // as of Java 8, there is no built-in conversion method from - // TimeUnit to ChronoUnit; do it manually - return Instant.now().plusMillis(unit.toMillis(timeToLive)); - } else { - // never expires - return Instant.MAX; + private Date calculateExpirationTime(long timeToLive, TimeUnit unit) { + Date expirationTime = null; + if(timeToLive > 0){ + long currentTime = System.currentTimeMillis(); + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(currentTime + unit.toMillis(timeToLive)); + expirationTime = cal.getTime(); } + return expirationTime; } } diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java index db0e3d4c5..c33c66042 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java @@ -41,14 +41,14 @@ public class QueueManager { private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class); - private MessageExpirationListener listener; private ExecutorService messageExecutor; + private LinkedBlockingQueue<QueueMessage> queue; private int max_thread_size; private int max_queue_size; private Util executionQueueUtil; public QueueManager() { - //do nothing + } /** @@ -90,14 +90,10 @@ public class QueueManager { } } - public void setListener(MessageExpirationListener listener) { - this.listener = listener; - } - /** * Injected by blueprint * - * @param executionQueueUtil Util to be set + * @param executionQueueUtil */ public void setExecutionQueueUtil(Util executionQueueUtil) { this.executionQueueUtil = executionQueueUtil; @@ -106,22 +102,16 @@ public class QueueManager { public boolean enqueueTask(QueueMessage queueMessage) { boolean isEnqueued = true; try { - messageExecutor.execute(() -> { - if (queueMessage.isExpired()) { - logger.debug("Message expired " + queueMessage.getMessage()); - if (listener != null) { - listener.onMessageExpiration(queueMessage.getMessage()); - } else { - logger.warn("Listener not available for expired message "); - } - } else { - queueMessage.getMessage().run(); - } - }); + messageExecutor.execute(() -> queueMessage.getMessage().run()); } catch (RejectedExecutionException ree) { isEnqueued = false; } return isEnqueued; } + + private boolean messageExpired(QueueMessage queueMessage) { + return queueMessage.getExpirationTime() != null && + queueMessage.getExpirationTime().getTime() < System.currentTimeMillis(); + } } diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java index bb48da7e5..75d1275d2 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java @@ -24,23 +24,22 @@ package org.onap.appc.executionqueue.impl.object; -import java.time.Instant; -import java.util.Objects; +import java.util.Date; public class QueueMessage<M extends Runnable> { - private final M message; - private final Instant expirationTime; - public QueueMessage(M message, Instant expirationTime){ + M message; + Date expirationTime; + public QueueMessage(M message, Date expirationTime){ this.message = message; - this.expirationTime = Objects.requireNonNull(expirationTime); + this.expirationTime = expirationTime; } public M getMessage() { return message; } - public boolean isExpired() { - return expirationTime.isBefore(Instant.now()); + public Date getExpirationTime() { + return expirationTime; } } diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/Listener.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/Listener.java deleted file mode 100644 index ce26fd92a..000000000 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/Listener.java +++ /dev/null @@ -1,42 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.executionqueue; - -import org.onap.appc.executionqueue.MessageExpirationListener; - - -public class Listener implements MessageExpirationListener { - - boolean listenerExecuted = false; - - public boolean isListenerExecuted() { - return listenerExecuted; - } - - @Override - public void onMessageExpiration(Object message) { - listenerExecuted = true; - } -} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/ExecutionQueueServiceTest.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/TestExecutionQueueService.java index 67f480d47..6884e9ccc 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/ExecutionQueueServiceTest.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/TestExecutionQueueService.java @@ -37,18 +37,20 @@ import org.onap.appc.executionqueue.impl.ExecutionQueueServiceImpl; import org.onap.appc.executionqueue.impl.QueueManager; import org.powermock.modules.junit4.PowerMockRunner; +import java.util.concurrent.TimeUnit; + import static org.mockito.Matchers.any; import static org.mockito.Mockito.times; @RunWith(PowerMockRunner.class) -public class ExecutionQueueServiceTest { +public class TestExecutionQueueService { @InjectMocks - private ExecutionQueueServiceImpl service; + ExecutionQueueServiceImpl service; @Spy - private QueueManager queueManager = new QueueManager(); + QueueManager queueManager = new QueueManager(); @Spy - private Util executionQueueUtil = new Util(); + Util executionQueueUtil = new Util(); @Before public void setup() { |