aboutsummaryrefslogtreecommitdiffstats
path: root/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib
diff options
context:
space:
mode:
Diffstat (limited to 'appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib')
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/pom.xml70
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java1
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java20
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java38
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java61
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java28
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java15
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/Listener.java42
-rw-r--r--appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/TestExecutionQueueService.java (renamed from appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/ExecutionQueueServiceTest.java)10
9 files changed, 109 insertions, 176 deletions
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/pom.xml b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/pom.xml
index cc89f3c3f..75d4fb09a 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/pom.xml
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/pom.xml
@@ -1,33 +1,59 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ============LICENSE_START=======================================================
+ ONAP : APPC
+ ================================================================================
+ Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ ================================================================================
+ Copyright (C) 2017 Amdocs
+ =============================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+ ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ ============LICENSE_END=========================================================
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onap.appc</groupId>
<artifactId>appc-dispatcher-common</artifactId>
<version>1.3.0-SNAPSHOT</version>
</parent>
- <artifactId>execution-queue-management-lib</artifactId>
- <packaging>bundle</packaging>
- <name>execution-queue-management-lib</name>
- <url>http://maven.apache.org</url>
+ <artifactId>execution-queue-management-lib</artifactId>
+ <packaging>bundle</packaging>
+ <name>APPC Dispatcher Common - Qxecution Queue Mgmt lib</name>
+ <url>http://maven.apache.org</url>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
- <dependencies>
- <dependency>
- <groupId>org.onap.appc</groupId>
- <artifactId>appc-common</artifactId>
- <version>${project.version}</version>
- </dependency>
+ <dependencies>
+ <dependency>
+ <groupId>org.onap.appc</groupId>
+ <artifactId>appc-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
- <groupId>com.att.eelf</groupId>
- <artifactId>eelf-core</artifactId>
- </dependency>
- </dependencies>
+ <groupId>com.att.eelf</groupId>
+ <artifactId>eelf-core</artifactId>
+ </dependency>
+ </dependencies>
- <build>
+ <build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
@@ -37,7 +63,9 @@
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
<Bundle-Version>${project.version}</Bundle-Version>
<Embed-Transitive>true</Embed-Transitive>
- <Export-Package>org.onap.appc.executionqueue,org.onap.appc.executionqueue.impl</Export-Package>
+ <Export-Package>
+ org.onap.appc.executionqueue,org.onap.appc.executionqueue.impl
+ </Export-Package>
<Import-Package>
*;resolution:=optional
</Import-Package>
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java
index 1423962ef..2c4aa0853 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/ExecutionQueueService.java
@@ -31,5 +31,4 @@ import org.onap.appc.exceptions.APPCException;
public interface ExecutionQueueService<M extends Runnable> {
void putMessage(M message) throws APPCException;
void putMessage(M message, long timeout, TimeUnit unit) throws APPCException;
- void registerMessageExpirationListener(MessageExpirationListener listener);
}
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java
index 164a8b563..09d49deef 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/helper/Util.java
@@ -24,8 +24,6 @@
package org.onap.appc.executionqueue.helper;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
import org.onap.appc.configuration.Configuration;
import org.onap.appc.configuration.ConfigurationFactory;
@@ -35,11 +33,10 @@ import java.util.concurrent.atomic.AtomicInteger;
public class Util {
- private final EELFLogger logger = EELFManager.getInstance().getLogger(Util.class);
- private final int default_queue_size = 10;
- private final int default_threadpool_size = 10;
- private final String queue_size_key = "appc.dispatcher.executionqueue.backlog.size";
- private final String threadpool_size_key = "appc.dispatcher.executionqueue.threadpool.size";
+ private int default_queue_size = 10;
+ private int default_threadpool_size = 10;
+ private String queue_size_key = "appc.dispatcher.executionqueue.backlog.size";
+ private String threadpool_size_key = "appc.dispatcher.executionqueue.threadpool.size";
private Configuration configuration;
@@ -48,7 +45,6 @@ public class Util {
* <p>Used by blueprint.
*/
public void init() {
-
configuration = ConfigurationFactory.getConfiguration();
}
@@ -59,7 +55,7 @@ public class Util {
try {
size = Integer.parseInt(sizeStr);
} catch (NumberFormatException e) {
- logger.error("Error while parse key:" + queue_size_key + " got from configuration " + e.getMessage(), e);
+
}
return size;
@@ -72,8 +68,7 @@ public class Util {
try {
size = Integer.parseInt(sizeStr);
} catch (NumberFormatException e) {
- logger.error("Error while parse key:" + threadpool_size_key + " got from configuration "
- + e.getMessage(), e);
+
}
return size;
@@ -89,8 +84,7 @@ public class Util {
Thread t = factory.newThread(r);
t.setDaemon(isDaemon);
if (threadNamePrefix != null && !threadNamePrefix.isEmpty()) {
- final String threadName = String.format(THREAD_NAME_PATTERN, threadNamePrefix, counter
- .incrementAndGet());
+ final String threadName = String.format(THREAD_NAME_PATTERN, threadNamePrefix, counter.incrementAndGet());
t.setName(threadName);
}
return t;
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java
deleted file mode 100644
index f071be6f0..000000000
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.executionqueue.impl;
-
-import org.onap.appc.executionqueue.ExecutionQueueService;
-
-public class ExecutionQueueServiceFactory {
-
- private static class ExecutionQueueServiceHolder {
- public static final ExecutionQueueService executionQueueService = new ExecutionQueueServiceImpl();
- }
-
- public static ExecutionQueueService getExecutionQueueService() {
- return ExecutionQueueServiceHolder.executionQueueService;
- }
-}
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
index 0634a0eb2..027cc9d4b 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
@@ -31,58 +31,59 @@ import org.onap.appc.executionqueue.ExecutionQueueService;
import org.onap.appc.executionqueue.MessageExpirationListener;
import org.onap.appc.executionqueue.impl.object.QueueMessage;
-import java.time.Instant;
+import java.util.Calendar;
+import java.util.Date;
import java.util.concurrent.TimeUnit;
public class ExecutionQueueServiceImpl<M extends Runnable> implements ExecutionQueueService<M> {
- private static final EELFLogger logger =
- EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class);
+ private final EELFLogger logger = EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class);
private QueueManager queueManager;
- public ExecutionQueueServiceImpl() {
- //do nothing
+ public ExecutionQueueServiceImpl(){
+
+ }
+
+ @Override
+ public void putMessage(M message) throws APPCException {
+ this.putMessage(message,-1,null);
}
/**
* Injected by blueprint
- *
- * @param queueManager queue manager to be set
+ * @param queueManager
*/
public void setQueueManager(QueueManager queueManager) {
this.queueManager = queueManager;
}
@Override
- public void putMessage(M message) throws APPCException {
- this.putMessage(message, -1, null);
- }
+ public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException{
+ QueueMessage queueMessage;
- @Override
- public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException {
- Instant expirationTime = calculateExpirationTime(timeout, unit);
- boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<>(message, expirationTime));
- if (!enqueueTask) {
- logger.error("Error in putMessage method of ExecutionQueueServiceImpl");
- throw new APPCException("Failed to put message in queue");
+ try {
+ Date expirationTime = calculateExpirationTime(timeout,unit);
+ queueMessage = new QueueMessage(message,expirationTime);
+ boolean enqueueTask = queueManager.enqueueTask(queueMessage);
+ if(!enqueueTask){
+ throw new APPCException("failed to put message in queue");
+ }
+ } catch (Exception e) {
+ logger.error("Error in putMessage method of ExecutionQueueServiceImpl" + e.getMessage());
+ throw new APPCException(e);
}
}
- @Override
- public void registerMessageExpirationListener(MessageExpirationListener listener) {
- queueManager.setListener(listener);
- }
-
- private Instant calculateExpirationTime(long timeToLive, TimeUnit unit) {
- if (timeToLive > 0 && unit != null) {
- // as of Java 8, there is no built-in conversion method from
- // TimeUnit to ChronoUnit; do it manually
- return Instant.now().plusMillis(unit.toMillis(timeToLive));
- } else {
- // never expires
- return Instant.MAX;
+ private Date calculateExpirationTime(long timeToLive, TimeUnit unit) {
+ Date expirationTime = null;
+ if(timeToLive > 0){
+ long currentTime = System.currentTimeMillis();
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(currentTime + unit.toMillis(timeToLive));
+ expirationTime = cal.getTime();
}
+ return expirationTime;
}
}
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java
index db0e3d4c5..c33c66042 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/QueueManager.java
@@ -41,14 +41,14 @@ public class QueueManager {
private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class);
- private MessageExpirationListener listener;
private ExecutorService messageExecutor;
+ private LinkedBlockingQueue<QueueMessage> queue;
private int max_thread_size;
private int max_queue_size;
private Util executionQueueUtil;
public QueueManager() {
- //do nothing
+
}
/**
@@ -90,14 +90,10 @@ public class QueueManager {
}
}
- public void setListener(MessageExpirationListener listener) {
- this.listener = listener;
- }
-
/**
* Injected by blueprint
*
- * @param executionQueueUtil Util to be set
+ * @param executionQueueUtil
*/
public void setExecutionQueueUtil(Util executionQueueUtil) {
this.executionQueueUtil = executionQueueUtil;
@@ -106,22 +102,16 @@ public class QueueManager {
public boolean enqueueTask(QueueMessage queueMessage) {
boolean isEnqueued = true;
try {
- messageExecutor.execute(() -> {
- if (queueMessage.isExpired()) {
- logger.debug("Message expired " + queueMessage.getMessage());
- if (listener != null) {
- listener.onMessageExpiration(queueMessage.getMessage());
- } else {
- logger.warn("Listener not available for expired message ");
- }
- } else {
- queueMessage.getMessage().run();
- }
- });
+ messageExecutor.execute(() -> queueMessage.getMessage().run());
} catch (RejectedExecutionException ree) {
isEnqueued = false;
}
return isEnqueued;
}
+
+ private boolean messageExpired(QueueMessage queueMessage) {
+ return queueMessage.getExpirationTime() != null &&
+ queueMessage.getExpirationTime().getTime() < System.currentTimeMillis();
+ }
}
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java
index bb48da7e5..75d1275d2 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/onap/appc/executionqueue/impl/object/QueueMessage.java
@@ -24,23 +24,22 @@
package org.onap.appc.executionqueue.impl.object;
-import java.time.Instant;
-import java.util.Objects;
+import java.util.Date;
public class QueueMessage<M extends Runnable> {
- private final M message;
- private final Instant expirationTime;
- public QueueMessage(M message, Instant expirationTime){
+ M message;
+ Date expirationTime;
+ public QueueMessage(M message, Date expirationTime){
this.message = message;
- this.expirationTime = Objects.requireNonNull(expirationTime);
+ this.expirationTime = expirationTime;
}
public M getMessage() {
return message;
}
- public boolean isExpired() {
- return expirationTime.isBefore(Instant.now());
+ public Date getExpirationTime() {
+ return expirationTime;
}
}
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/Listener.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/Listener.java
deleted file mode 100644
index ce26fd92a..000000000
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/Listener.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.appc.executionqueue;
-
-import org.onap.appc.executionqueue.MessageExpirationListener;
-
-
-public class Listener implements MessageExpirationListener {
-
- boolean listenerExecuted = false;
-
- public boolean isListenerExecuted() {
- return listenerExecuted;
- }
-
- @Override
- public void onMessageExpiration(Object message) {
- listenerExecuted = true;
- }
-}
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/ExecutionQueueServiceTest.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/TestExecutionQueueService.java
index 67f480d47..6884e9ccc 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/ExecutionQueueServiceTest.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/onap/appc/executionqueue/TestExecutionQueueService.java
@@ -37,18 +37,20 @@ import org.onap.appc.executionqueue.impl.ExecutionQueueServiceImpl;
import org.onap.appc.executionqueue.impl.QueueManager;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.util.concurrent.TimeUnit;
+
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.times;
@RunWith(PowerMockRunner.class)
-public class ExecutionQueueServiceTest {
+public class TestExecutionQueueService {
@InjectMocks
- private ExecutionQueueServiceImpl service;
+ ExecutionQueueServiceImpl service;
@Spy
- private QueueManager queueManager = new QueueManager();
+ QueueManager queueManager = new QueueManager();
@Spy
- private Util executionQueueUtil = new Util();
+ Util executionQueueUtil = new Util();
@Before
public void setup() {