From 36bcd566167f2f91c0e8e7a304fce5f6bc150776 Mon Sep 17 00:00:00 2001 From: Anand Date: Thu, 4 Jan 2018 19:35:51 -0500 Subject: Include impacted changes for APPC-346,APPC-348 Issue-ID: APPC-347 Change-Id: I399bc2a1e0dfd481e103032a373bb80fce5baf41 Signed-off-by: Anand --- .../appc/executionqueue/ExecutionQueueService.java | 1 - .../org/onap/appc/executionqueue/helper/Util.java | 20 +++---- .../impl/ExecutionQueueServiceFactory.java | 38 ------------ .../impl/ExecutionQueueServiceImpl.java | 61 +++++++++---------- .../appc/executionqueue/impl/QueueManager.java | 28 +++------ .../executionqueue/impl/object/QueueMessage.java | 15 +++-- .../executionqueue/ExecutionQueueServiceTest.java | 68 --------------------- .../org/onap/appc/executionqueue/Listener.java | 42 ------------- .../executionqueue/TestExecutionQueueService.java | 70 ++++++++++++++++++++++ 9 files changed, 124 insertions(+), 219 deletions(-) delete mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java delete mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/ExecutionQueueServiceTest.java delete mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/Listener.java create mode 100644 appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/TestExecutionQueueService.java (limited to 'appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src') diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java index 1423962ef..2c4aa0853 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java @@ -31,5 +31,4 @@ import org.onap.appc.exceptions.APPCException; public interface ExecutionQueueService { void putMessage(M message) throws APPCException; void putMessage(M message, long timeout, TimeUnit unit) throws APPCException; - void registerMessageExpirationListener(MessageExpirationListener listener); } diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java index 164a8b563..09d49deef 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java @@ -24,8 +24,6 @@ package org.onap.appc.executionqueue.helper; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; import org.onap.appc.configuration.Configuration; import org.onap.appc.configuration.ConfigurationFactory; @@ -35,11 +33,10 @@ import java.util.concurrent.atomic.AtomicInteger; public class Util { - private final EELFLogger logger = EELFManager.getInstance().getLogger(Util.class); - private final int default_queue_size = 10; - private final int default_threadpool_size = 10; - private final String queue_size_key = "appc.dispatcher.executionqueue.backlog.size"; - private final String threadpool_size_key = "appc.dispatcher.executionqueue.threadpool.size"; + private int default_queue_size = 10; + private int default_threadpool_size = 10; + private String queue_size_key = "appc.dispatcher.executionqueue.backlog.size"; + private String threadpool_size_key = "appc.dispatcher.executionqueue.threadpool.size"; private Configuration configuration; @@ -48,7 +45,6 @@ public class Util { *

Used by blueprint. */ public void init() { - configuration = ConfigurationFactory.getConfiguration(); } @@ -59,7 +55,7 @@ public class Util { try { size = Integer.parseInt(sizeStr); } catch (NumberFormatException e) { - logger.error("Error while parse key:" + queue_size_key + " got from configuration " + e.getMessage(), e); + } return size; @@ -72,8 +68,7 @@ public class Util { try { size = Integer.parseInt(sizeStr); } catch (NumberFormatException e) { - logger.error("Error while parse key:" + threadpool_size_key + " got from configuration " - + e.getMessage(), e); + } return size; @@ -89,8 +84,7 @@ public class Util { Thread t = factory.newThread(r); t.setDaemon(isDaemon); if (threadNamePrefix != null && !threadNamePrefix.isEmpty()) { - final String threadName = String.format(THREAD_NAME_PATTERN, threadNamePrefix, counter - .incrementAndGet()); + final String threadName = String.format(THREAD_NAME_PATTERN, threadNamePrefix, counter.incrementAndGet()); t.setName(threadName); } return t; diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java deleted file mode 100644 index f071be6f0..000000000 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.executionqueue.impl; - -import org.onap.appc.executionqueue.ExecutionQueueService; - -public class ExecutionQueueServiceFactory { - - private static class ExecutionQueueServiceHolder { - public static final ExecutionQueueService executionQueueService = new ExecutionQueueServiceImpl(); - } - - public static ExecutionQueueService getExecutionQueueService() { - return ExecutionQueueServiceHolder.executionQueueService; - } -} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java index 0634a0eb2..027cc9d4b 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java @@ -31,58 +31,59 @@ import org.onap.appc.executionqueue.ExecutionQueueService; import org.onap.appc.executionqueue.MessageExpirationListener; import org.onap.appc.executionqueue.impl.object.QueueMessage; -import java.time.Instant; +import java.util.Calendar; +import java.util.Date; import java.util.concurrent.TimeUnit; public class ExecutionQueueServiceImpl implements ExecutionQueueService { - private static final EELFLogger logger = - EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class); + private final EELFLogger logger = EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class); private QueueManager queueManager; - public ExecutionQueueServiceImpl() { - //do nothing + public ExecutionQueueServiceImpl(){ + + } + + @Override + public void putMessage(M message) throws APPCException { + this.putMessage(message,-1,null); } /** * Injected by blueprint - * - * @param queueManager queue manager to be set + * @param queueManager */ public void setQueueManager(QueueManager queueManager) { this.queueManager = queueManager; } @Override - public void putMessage(M message) throws APPCException { - this.putMessage(message, -1, null); - } + public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException{ + QueueMessage queueMessage; - @Override - public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException { - Instant expirationTime = calculateExpirationTime(timeout, unit); - boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<>(message, expirationTime)); - if (!enqueueTask) { - logger.error("Error in putMessage method of ExecutionQueueServiceImpl"); - throw new APPCException("Failed to put message in queue"); + try { + Date expirationTime = calculateExpirationTime(timeout,unit); + queueMessage = new QueueMessage(message,expirationTime); + boolean enqueueTask = queueManager.enqueueTask(queueMessage); + if(!enqueueTask){ + throw new APPCException("failed to put message in queue"); + } + } catch (Exception e) { + logger.error("Error in putMessage method of ExecutionQueueServiceImpl" + e.getMessage()); + throw new APPCException(e); } } - @Override - public void registerMessageExpirationListener(MessageExpirationListener listener) { - queueManager.setListener(listener); - } - - private Instant calculateExpirationTime(long timeToLive, TimeUnit unit) { - if (timeToLive > 0 && unit != null) { - // as of Java 8, there is no built-in conversion method from - // TimeUnit to ChronoUnit; do it manually - return Instant.now().plusMillis(unit.toMillis(timeToLive)); - } else { - // never expires - return Instant.MAX; + private Date calculateExpirationTime(long timeToLive, TimeUnit unit) { + Date expirationTime = null; + if(timeToLive > 0){ + long currentTime = System.currentTimeMillis(); + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(currentTime + unit.toMillis(timeToLive)); + expirationTime = cal.getTime(); } + return expirationTime; } } diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java index db0e3d4c5..c33c66042 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java @@ -41,14 +41,14 @@ public class QueueManager { private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class); - private MessageExpirationListener listener; private ExecutorService messageExecutor; + private LinkedBlockingQueue queue; private int max_thread_size; private int max_queue_size; private Util executionQueueUtil; public QueueManager() { - //do nothing + } /** @@ -90,14 +90,10 @@ public class QueueManager { } } - public void setListener(MessageExpirationListener listener) { - this.listener = listener; - } - /** * Injected by blueprint * - * @param executionQueueUtil Util to be set + * @param executionQueueUtil */ public void setExecutionQueueUtil(Util executionQueueUtil) { this.executionQueueUtil = executionQueueUtil; @@ -106,22 +102,16 @@ public class QueueManager { public boolean enqueueTask(QueueMessage queueMessage) { boolean isEnqueued = true; try { - messageExecutor.execute(() -> { - if (queueMessage.isExpired()) { - logger.debug("Message expired " + queueMessage.getMessage()); - if (listener != null) { - listener.onMessageExpiration(queueMessage.getMessage()); - } else { - logger.warn("Listener not available for expired message "); - } - } else { - queueMessage.getMessage().run(); - } - }); + messageExecutor.execute(() -> queueMessage.getMessage().run()); } catch (RejectedExecutionException ree) { isEnqueued = false; } return isEnqueued; } + + private boolean messageExpired(QueueMessage queueMessage) { + return queueMessage.getExpirationTime() != null && + queueMessage.getExpirationTime().getTime() < System.currentTimeMillis(); + } } diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java index bb48da7e5..75d1275d2 100644 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java @@ -24,23 +24,22 @@ package org.onap.appc.executionqueue.impl.object; -import java.time.Instant; -import java.util.Objects; +import java.util.Date; public class QueueMessage { - private final M message; - private final Instant expirationTime; - public QueueMessage(M message, Instant expirationTime){ + M message; + Date expirationTime; + public QueueMessage(M message, Date expirationTime){ this.message = message; - this.expirationTime = Objects.requireNonNull(expirationTime); + this.expirationTime = expirationTime; } public M getMessage() { return message; } - public boolean isExpired() { - return expirationTime.isBefore(Instant.now()); + public Date getExpirationTime() { + return expirationTime; } } diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/ExecutionQueueServiceTest.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/ExecutionQueueServiceTest.java deleted file mode 100644 index 67f480d47..000000000 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/ExecutionQueueServiceTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.executionqueue; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.InjectMocks; -import org.mockito.Mockito; -import org.mockito.Spy; -import org.onap.appc.exceptions.APPCException; -import org.onap.appc.executionqueue.helper.Util; -import org.onap.appc.executionqueue.impl.ExecutionQueueServiceImpl; -import org.onap.appc.executionqueue.impl.QueueManager; -import org.powermock.modules.junit4.PowerMockRunner; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.times; - -@RunWith(PowerMockRunner.class) -public class ExecutionQueueServiceTest { - - @InjectMocks - private ExecutionQueueServiceImpl service; - @Spy - private QueueManager queueManager = new QueueManager(); - @Spy - private Util executionQueueUtil = new Util(); - - @Before - public void setup() { - Mockito.doReturn(true).when(queueManager).enqueueTask(any()); - } - - @Test - public void testPositiveFlow() { - Message message = new Message(); - try { - service.putMessage(message); - Mockito.verify(queueManager, times(1)).enqueueTask(any()); - } catch (APPCException e) { - Assert.fail(e.toString()); - } - } -} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/Listener.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/Listener.java deleted file mode 100644 index ce26fd92a..000000000 --- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/Listener.java +++ /dev/null @@ -1,42 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP : APPC - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Copyright (C) 2017 Amdocs - * ============================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * ============LICENSE_END========================================================= - */ - -package org.onap.appc.executionqueue; - -import org.onap.appc.executionqueue.MessageExpirationListener; - - -public class Listener implements MessageExpirationListener { - - boolean listenerExecuted = false; - - public boolean isListenerExecuted() { - return listenerExecuted; - } - - @Override - public void onMessageExpiration(Object message) { - listenerExecuted = true; - } -} diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/TestExecutionQueueService.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/TestExecutionQueueService.java new file mode 100644 index 000000000..6884e9ccc --- /dev/null +++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/TestExecutionQueueService.java @@ -0,0 +1,70 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.executionqueue; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mockito; +import org.mockito.Spy; +import org.onap.appc.exceptions.APPCException; +import org.onap.appc.executionqueue.helper.Util; +import org.onap.appc.executionqueue.impl.ExecutionQueueServiceImpl; +import org.onap.appc.executionqueue.impl.QueueManager; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; + +@RunWith(PowerMockRunner.class) +public class TestExecutionQueueService { + + @InjectMocks + ExecutionQueueServiceImpl service; + @Spy + QueueManager queueManager = new QueueManager(); + @Spy + Util executionQueueUtil = new Util(); + + @Before + public void setup() { + Mockito.doReturn(true).when(queueManager).enqueueTask(any()); + } + + @Test + public void testPositiveFlow() { + Message message = new Message(); + try { + service.putMessage(message); + Mockito.verify(queueManager, times(1)).enqueueTask(any()); + } catch (APPCException e) { + Assert.fail(e.toString()); + } + } +} -- cgit 1.2.3-korg