summaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorDan Timoney <dtimoney@att.com>2020-04-10 12:39:06 +0000
committerGerrit Code Review <gerrit@onap.org>2020-04-10 12:39:06 +0000
commit62edf48439a0a6b59619c03280b96c1226ad77f0 (patch)
tree5375d77ba4cc3178f80ef3a7fefa50bad55b22b7 /lib
parent4b87cf4d3c50dacb356c8e53a80bbabcba14b621 (diff)
parent1c1791c5498dad7b7fd6b1591e0c5844d4c6c601 (diff)
Merge "Add new modules: Resource Lock and Doorman"
Diffstat (limited to 'lib')
-rw-r--r--lib/doorman/pom.xml54
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java11
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java10
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java6
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java8
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java12
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java30
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java307
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java5
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java5
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java33
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java59
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java5
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java21
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java9
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java5
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java12
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java357
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java57
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java336
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java236
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java273
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java319
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueDataItem.java91
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTest.java222
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTestResult.java120
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Classifier.java21
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Handler.java10
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Classifier.java21
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Handler.java53
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/DbUtil.java92
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/FileUtil.java24
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/util/TestJsonUtil.java26
-rw-r--r--lib/doorman/src/test/resources/it/tc1/result.json31
-rw-r--r--lib/doorman/src/test/resources/it/tc1/test.json67
-rw-r--r--lib/doorman/src/test/resources/it/tc2/request_entity1.txt4
-rw-r--r--lib/doorman/src/test/resources/it/tc2/request_entity2.txt4
-rw-r--r--lib/doorman/src/test/resources/it/tc2/request_other.txt4
-rw-r--r--lib/doorman/src/test/resources/it/tc2/response.txt3
-rw-r--r--lib/doorman/src/test/resources/it/tc2/result.json94
-rw-r--r--lib/doorman/src/test/resources/it/tc2/result2.json76
-rw-r--r--lib/doorman/src/test/resources/it/tc2/test.json202
-rw-r--r--lib/doorman/src/test/resources/schema.sql58
-rw-r--r--lib/doorman/src/test/resources/test1.json36
-rw-r--r--lib/rlock/pom.xml43
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java14
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java173
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java13
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java122
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java20
-rw-r--r--lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java35
-rw-r--r--lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java51
-rw-r--r--lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java92
-rw-r--r--lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java24
-rw-r--r--lib/rlock/src/test/resources/schema.sql10
55 files changed, 4026 insertions, 0 deletions
diff --git a/lib/doorman/pom.xml b/lib/doorman/pom.xml
new file mode 100644
index 000000000..e7883a944
--- /dev/null
+++ b/lib/doorman/pom.xml
@@ -0,0 +1,54 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onap.ccsdk.features</groupId>
+ <artifactId>ccsdk-features</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.onap.ccsdk.features.lib.doorman</groupId>
+ <artifactId>doorman</artifactId>
+
+ <description>Doorman - Request prioritization and agregation queue</description>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.ccsdk.features.lib.rlock</groupId>
+ <artifactId>rlock</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java
new file mode 100644
index 000000000..b6af1731e
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java
@@ -0,0 +1,11 @@
+package org.onap.ccsdk.features.lib.doorman;
+
+import org.onap.ccsdk.features.lib.doorman.data.MessageData;
+import org.onap.ccsdk.features.lib.doorman.data.Queue;
+
+public interface MessageClassifier {
+
+ Queue determineQueue(MessageData request);
+
+ String getExtMessageId(MessageData request);
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java
new file mode 100644
index 000000000..1ff811baa
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java
@@ -0,0 +1,10 @@
+package org.onap.ccsdk.features.lib.doorman;
+
+import org.onap.ccsdk.features.lib.doorman.data.MessageData;
+
+public interface MessageInterceptor {
+
+ MessageData processRequest(MessageData request);
+
+ void processResponse(MessageData response);
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java
new file mode 100644
index 000000000..4c9886425
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java
@@ -0,0 +1,6 @@
+package org.onap.ccsdk.features.lib.doorman;
+
+public interface MessageInterceptorFactory {
+
+ MessageInterceptor create();
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java
new file mode 100644
index 000000000..0bb05a34c
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java
@@ -0,0 +1,8 @@
+package org.onap.ccsdk.features.lib.doorman;
+
+import org.onap.ccsdk.features.lib.doorman.data.MessageData;
+
+public interface MessageProcessor {
+
+ void processMessage(MessageData request);
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java
new file mode 100644
index 000000000..4285393a4
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java
@@ -0,0 +1,12 @@
+package org.onap.ccsdk.features.lib.doorman;
+
+import java.util.List;
+import java.util.Map;
+import org.onap.ccsdk.features.lib.doorman.data.Event;
+import org.onap.ccsdk.features.lib.doorman.data.Message;
+import org.onap.ccsdk.features.lib.doorman.data.MessageAction;
+
+public interface MessageQueueHandler {
+
+ Map<Long, MessageAction> nextAction(Event event, Message msg, List<Message> queue);
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java
new file mode 100644
index 000000000..764faa9c8
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java
@@ -0,0 +1,30 @@
+package org.onap.ccsdk.features.lib.doorman.dao;
+
+import java.util.Date;
+import java.util.List;
+import org.onap.ccsdk.features.lib.doorman.data.Message;
+import org.onap.ccsdk.features.lib.doorman.data.MessageAction;
+import org.onap.ccsdk.features.lib.doorman.data.MessageData;
+import org.onap.ccsdk.features.lib.doorman.data.MessageStatus;
+import org.onap.ccsdk.features.lib.doorman.data.Queue;
+
+public interface MessageDao {
+
+ long addArrivedMessage(String extMessageId, MessageData request, Queue queue, Date timestamp);
+
+ void updateMessageStarted(long messageId, Date timestamp);
+
+ void updateMessageCompleted(long messageId, String resolution, Date timestamp);
+
+ void updateMessageResponse(long messageId, Date timestamp, MessageData response);
+
+ void addStatus(long messageId, MessageStatus status);
+
+ void addAction(long messageId, MessageAction action);
+
+ void updateActionDone(long actionId, Date now);
+
+ List<Message> readMessageQueue(Queue queue);
+
+ MessageAction getNextAction(long messageId);
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java
new file mode 100644
index 000000000..b3ecf3d4b
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java
@@ -0,0 +1,307 @@
+package org.onap.ccsdk.features.lib.doorman.dao;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import javax.sql.DataSource;
+import org.onap.ccsdk.features.lib.doorman.data.ActionStatus;
+import org.onap.ccsdk.features.lib.doorman.data.Message;
+import org.onap.ccsdk.features.lib.doorman.data.MessageAction;
+import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue;
+import org.onap.ccsdk.features.lib.doorman.data.MessageData;
+import org.onap.ccsdk.features.lib.doorman.data.MessageStatus;
+import org.onap.ccsdk.features.lib.doorman.data.MessageStatusValue;
+import org.onap.ccsdk.features.lib.doorman.data.Queue;
+import org.onap.ccsdk.features.lib.doorman.util.JsonUtil;
+
+public class MessageDaoImpl implements MessageDao {
+
+ private DataSource dataSource;
+
+ @Override
+ public long addArrivedMessage(String extMessageId, MessageData request, Queue queue, Date timestamp) {
+ try (Connection con = dataSource.getConnection()) {
+ try {
+ con.setAutoCommit(false);
+ long id = 0;
+ String sql = "INSERT INTO message (ext_message_id, request_param, request_body, arrived_timestamp, queue_type, queue_id) VALUES (?, ?, ?, ?, ?, ?)";
+ try (PreparedStatement ps = con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
+ ps.setString(1, extMessageId);
+ ps.setString(2, JsonUtil.dataToJson(request.param));
+ ps.setString(3, request.body);
+ ps.setTimestamp(4, new Timestamp(timestamp.getTime()));
+ if (queue != null) {
+ ps.setString(5, queue.type);
+ ps.setString(6, queue.id);
+ } else {
+ ps.setNull(5, Types.VARCHAR);
+ ps.setNull(6, Types.VARCHAR);
+ }
+ ps.executeUpdate();
+ try (ResultSet rs = ps.getGeneratedKeys()) {
+ rs.next();
+ id = rs.getLong(1);
+ }
+ }
+ con.commit();
+ return id;
+ } catch (SQLException ex) {
+ con.rollback();
+ throw ex;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error inserting message to DB: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void updateMessageStarted(long messageId, Date timestamp) {
+ updateMessageStatus("started_timestamp", messageId, null, timestamp);
+ }
+
+ @Override
+ public void updateMessageCompleted(long messageId, String resolution, Date timestamp) {
+ updateMessageStatus("completed_timestamp", messageId, resolution, timestamp);
+ }
+
+ private void updateMessageStatus(String timestampColumn, long messageId, String resolution, Date timestamp) {
+ try (Connection con = dataSource.getConnection()) {
+ try {
+ con.setAutoCommit(false);
+ String sql = "UPDATE message SET " + timestampColumn + " = ? WHERE message_id = ?";
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setTimestamp(1, new Timestamp(timestamp.getTime()));
+ ps.setLong(2, messageId);
+ ps.executeUpdate();
+ }
+ con.commit();
+ } catch (SQLException ex) {
+ con.rollback();
+ throw ex;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error updating message status in DB: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void updateMessageResponse(long messageId, Date timestamp, MessageData response) {
+ try (Connection con = dataSource.getConnection()) {
+ try {
+ con.setAutoCommit(false);
+ String sql = "UPDATE message SET response_timestamp = ?, response_param = ?, response_body = ? WHERE message_id = ?";
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setTimestamp(1, new Timestamp(timestamp.getTime()));
+ ps.setString(2, JsonUtil.dataToJson(response.param));
+ ps.setString(3, response.body);
+ ps.setLong(4, messageId);
+ ps.executeUpdate();
+ }
+ con.commit();
+ } catch (SQLException ex) {
+ con.rollback();
+ throw ex;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error updating message response in DB: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void addStatus(long messageId, MessageStatus status) {
+ try (Connection con = dataSource.getConnection()) {
+ try {
+ con.setAutoCommit(false);
+ String sql = "INSERT INTO message_status (message_id, status, status_timestamp) VALUES (?, ?, ?)";
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setLong(1, messageId);
+ ps.setString(2, status.status.toString());
+ ps.setTimestamp(3, new Timestamp(status.timestamp.getTime()));
+ ps.executeUpdate();
+ }
+ con.commit();
+ } catch (SQLException ex) {
+ con.rollback();
+ throw ex;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error inserting message status to DB: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void addAction(long messageId, MessageAction action) {
+ try (Connection con = dataSource.getConnection()) {
+ try {
+ con.setAutoCommit(false);
+ String sql = "INSERT INTO message_action (message_id, action, action_status, resolution, action_timestamp, done_timestamp, hold_time, response_param, response_body) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setLong(1, messageId);
+ ps.setString(2, action.action.toString());
+ ps.setString(3, action.actionStatus.toString());
+ ps.setString(4, action.resolution);
+ ps.setTimestamp(5, new Timestamp(action.timestamp.getTime()));
+ if (action.doneTimestamp != null) {
+ ps.setTimestamp(6, new Timestamp(action.doneTimestamp.getTime()));
+ } else {
+ ps.setNull(6, Types.TIMESTAMP);
+ }
+ ps.setInt(7, action.holdTime);
+ if (action.returnResponse != null) {
+ ps.setString(8, JsonUtil.dataToJson(action.returnResponse.param));
+ ps.setString(9, action.returnResponse.body);
+ } else {
+ ps.setNull(8, Types.VARCHAR);
+ ps.setNull(9, Types.VARCHAR);
+ }
+ ps.executeUpdate();
+ }
+ con.commit();
+ } catch (SQLException ex) {
+ con.rollback();
+ throw ex;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error inserting message action to DB: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void updateActionDone(long actionId, Date now) {
+ try (Connection con = dataSource.getConnection()) {
+ try {
+ con.setAutoCommit(false);
+ String sql = "UPDATE message_action SET action_status = ?, done_timestamp = ? WHERE message_action_id = ?";
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setString(1, ActionStatus.DONE.toString());
+ ps.setTimestamp(2, new Timestamp(now.getTime()));
+ ps.setLong(3, actionId);
+ ps.executeUpdate();
+ }
+ con.commit();
+ } catch (SQLException ex) {
+ con.rollback();
+ throw ex;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error updating action in DB: " + e.getMessage(), e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<Message> readMessageQueue(Queue queue) {
+ List<Message> messageList = new ArrayList<>();
+ try (Connection con = dataSource.getConnection()) {
+ String msql = "SELECT * FROM message WHERE queue_type = ? AND queue_id = ?";
+ String ssql = "SELECT * FROM message_status WHERE message_id = ? ORDER BY message_status_id DESC";
+ String asql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY message_action_id DESC";
+ try (PreparedStatement mps = con.prepareStatement(msql); PreparedStatement sps = con.prepareStatement(ssql); PreparedStatement aps = con.prepareStatement(asql)) {
+ mps.setString(1, queue.type);
+ mps.setString(2, queue.id);
+ try (ResultSet mrs = mps.executeQuery()) {
+ while (mrs.next()) {
+ Message m = new Message();
+ m.messageId = mrs.getLong("message_id");
+ m.extMessageId = mrs.getString("ext_message_id");
+ m.request = new MessageData();
+ m.request.param = (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("request_param"));
+ m.request.body = mrs.getString("request_body");
+ m.response = new MessageData();
+ m.response.param = (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("response_param"));
+ m.response.body = mrs.getString("response_body");
+ m.queue = new Queue();
+ m.queue.type = mrs.getString("queue_type");
+ m.queue.id = mrs.getString("queue_id");
+ m.arrivedTimestamp = mrs.getTimestamp("arrived_timestamp");
+ m.startedTimestamp = mrs.getTimestamp("started_timestamp");
+ m.completedTimestamp = mrs.getTimestamp("completed_timestamp");
+ m.responseTimestamp = mrs.getTimestamp("response_timestamp");
+ m.statusHistory = new ArrayList<>();
+ m.actionHistory = new ArrayList<>();
+ messageList.add(m);
+
+ sps.setLong(1, m.messageId);
+ try (ResultSet srs = sps.executeQuery()) {
+ while (srs.next()) {
+ MessageStatus s = new MessageStatus();
+ s.status = MessageStatusValue.valueOf(srs.getString("status"));
+ s.timestamp = srs.getTimestamp("status_timestamp");
+ m.statusHistory.add(s);
+ }
+ }
+
+ aps.setLong(1, m.messageId);
+ try (ResultSet ars = aps.executeQuery()) {
+ while (ars.next()) {
+ MessageAction a = new MessageAction();
+ a.actionId = ars.getLong("message_action_id");
+ a.action = MessageActionValue.valueOf(ars.getString("action"));
+ a.actionStatus = ActionStatus.valueOf(ars.getString("action_status"));
+ a.timestamp = ars.getTimestamp("action_timestamp");
+ a.doneTimestamp = ars.getTimestamp("done_timestamp");
+ a.holdTime = ars.getInt("hold_time");
+ a.returnResponse = new MessageData();
+ a.returnResponse.param = (Map<String, Object>) JsonUtil.jsonToData(ars.getString("response_param"));
+ a.returnResponse.body = ars.getString("response_body");
+ if (a.returnResponse.param == null && a.returnResponse.body == null) {
+ a.returnResponse = null;
+ }
+ a.resolution = ars.getString("resolution");
+ m.actionHistory.add(a);
+ }
+ }
+ }
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e);
+ }
+ return messageList;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public MessageAction getNextAction(long messageId) {
+ try (Connection con = dataSource.getConnection()) {
+ String sql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY action_timestamp DESC";
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setLong(1, messageId);
+ try (ResultSet rs = ps.executeQuery()) {
+ if (rs.next()) {
+ MessageAction a = new MessageAction();
+ a.actionId = rs.getLong("message_action_id");
+ a.action = MessageActionValue.valueOf(rs.getString("action"));
+ a.actionStatus = ActionStatus.valueOf(rs.getString("action_status"));
+ a.timestamp = rs.getTimestamp("action_timestamp");
+ a.doneTimestamp = rs.getTimestamp("done_timestamp");
+ a.holdTime = rs.getInt("hold_time");
+ a.returnResponse = new MessageData();
+ a.returnResponse.param = (Map<String, Object>) JsonUtil.jsonToData(rs.getString("response_param"));
+ a.returnResponse.body = rs.getString("response_body");
+ if (a.returnResponse.param == null && a.returnResponse.body == null) {
+ a.returnResponse = null;
+ }
+ a.resolution = rs.getString("resolution");
+ return a;
+ }
+ return null;
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e);
+ }
+ }
+
+ public void setDataSource(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java
new file mode 100644
index 000000000..4aa016575
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java
@@ -0,0 +1,5 @@
+package org.onap.ccsdk.features.lib.doorman.data;
+
+public enum ActionStatus {
+ PENDING, DONE
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java
new file mode 100644
index 000000000..9960eefbf
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java
@@ -0,0 +1,5 @@
+package org.onap.ccsdk.features.lib.doorman.data;
+
+public enum Event {
+ ARRIVED, COMPLETED, AWAKEN, CHECK
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java
new file mode 100644
index 000000000..1f0b11384
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java
@@ -0,0 +1,33 @@
+package org.onap.ccsdk.features.lib.doorman.data;
+
+import java.util.Date;
+import java.util.List;
+
+public class Message {
+
+ public long messageId;
+ public String extMessageId;
+ public MessageData request;
+ public MessageData response;
+ public Date arrivedTimestamp;
+ public Date startedTimestamp;
+ public Date completedTimestamp;
+ public Date responseTimestamp;
+ public Queue queue;
+
+ public List<MessageStatus> statusHistory;
+ public List<MessageAction> actionHistory;
+
+ @Override
+ public String toString() {
+ StringBuilder ss = new StringBuilder();
+ ss.append(messageId);
+ if (extMessageId != null) {
+ ss.append("::").append(extMessageId);
+ }
+ if (queue != null) {
+ ss.append("::").append(queue);
+ }
+ return ss.toString();
+ }
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java
new file mode 100644
index 000000000..319fc68ff
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java
@@ -0,0 +1,59 @@
+package org.onap.ccsdk.features.lib.doorman.data;
+
+import java.util.Date;
+
+public class MessageAction {
+
+ public long actionId;
+ public MessageActionValue action;
+ public ActionStatus actionStatus;
+ public String resolution;
+ public Date timestamp;
+ public Date doneTimestamp;
+ public int holdTime; // in seconds
+ public MessageData returnResponse;
+
+ @Override
+ public String toString() {
+ return action.toString() + " | " + actionStatus + " | " + resolution + " | " + holdTime + " | " + returnResponse;
+ }
+
+ public boolean same(MessageAction a2) {
+ if (action != a2.action) {
+ return false;
+ }
+ if (action == MessageActionValue.HOLD || action == MessageActionValue.RETURN_HOLD) {
+ if (holdTime != a2.holdTime) {
+ return false;
+ }
+ }
+ if (action == MessageActionValue.RETURN_COMPLETE || action == MessageActionValue.RETURN_HOLD || action == MessageActionValue.RETURN_PROCESS) {
+ if (returnResponse == null != (a2.returnResponse == null)) {
+ return false;
+ }
+ if (returnResponse != null) {
+ if (returnResponse.param == null != (a2.returnResponse.param == null)) {
+ return false;
+ }
+ if (returnResponse.param != null && !returnResponse.param.equals(a2.returnResponse.param)) {
+ return false;
+ }
+ if (returnResponse.body == null != (a2.returnResponse.body == null)) {
+ return false;
+ }
+ if (returnResponse.body != null && !returnResponse.body.equals(a2.returnResponse.body)) {
+ return false;
+ }
+ }
+ }
+ if (action == MessageActionValue.RETURN_COMPLETE) {
+ if (resolution == null != (a2.resolution == null)) {
+ return false;
+ }
+ if (resolution != null && !resolution.equals(a2.resolution)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java
new file mode 100644
index 000000000..fd86d37c8
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java
@@ -0,0 +1,5 @@
+package org.onap.ccsdk.features.lib.doorman.data;
+
+public enum MessageActionValue {
+ HOLD, PROCESS, RETURN_COMPLETE, RETURN_PROCESS, RETURN_HOLD
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java
new file mode 100644
index 000000000..1fa5f9a14
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java
@@ -0,0 +1,21 @@
+package org.onap.ccsdk.features.lib.doorman.data;
+
+import java.util.Map;
+
+public class MessageData {
+
+ public Map<String, Object> param;
+ public String body;
+
+ @Override
+ public String toString() {
+ StringBuilder ss = new StringBuilder();
+ ss.append(param);
+ String b = body;
+ if (b != null && b.length() > 20) {
+ b = b.substring(0, 20) + "...";
+ }
+ ss.append(b);
+ return ss.toString();
+ }
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java
new file mode 100644
index 000000000..e9af20be1
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java
@@ -0,0 +1,9 @@
+package org.onap.ccsdk.features.lib.doorman.data;
+
+import java.util.Date;
+
+public class MessageStatus {
+
+ public MessageStatusValue status;
+ public Date timestamp;
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java
new file mode 100644
index 000000000..af2bbf024
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java
@@ -0,0 +1,5 @@
+package org.onap.ccsdk.features.lib.doorman.data;
+
+public enum MessageStatusValue {
+ ARRIVED, IN_QUEUE, PROCESSING_SYNC, PROCESSING_ASYNC, COMPLETED
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java
new file mode 100644
index 000000000..7fd83104a
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java
@@ -0,0 +1,12 @@
+package org.onap.ccsdk.features.lib.doorman.data;
+
+public class Queue {
+
+ public String type;
+ public String id;
+
+ @Override
+ public String toString() {
+ return type + "::" + id;
+ }
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java
new file mode 100644
index 000000000..a81942771
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java
@@ -0,0 +1,357 @@
+package org.onap.ccsdk.features.lib.doorman.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.onap.ccsdk.features.lib.doorman.MessageQueueHandler;
+import org.onap.ccsdk.features.lib.doorman.data.Event;
+import org.onap.ccsdk.features.lib.doorman.data.Message;
+import org.onap.ccsdk.features.lib.doorman.data.MessageAction;
+import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue;
+import org.onap.ccsdk.features.lib.doorman.data.MessageData;
+import org.onap.ccsdk.features.lib.doorman.data.MessageStatus;
+import org.onap.ccsdk.features.lib.doorman.data.MessageStatusValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageHandlerBaseImpl implements MessageQueueHandler {
+
+ private static final Logger log = LoggerFactory.getLogger(MessageHandlerBaseImpl.class);
+
+ private boolean async = false;
+ private int maxParallelCount = 1;
+ private int maxQueueSize = 0;
+ private int timeToLive = 0; // in seconds
+ private int maxTimeInQueue = 60; // in seconds
+ private int updateWaitTime = 60; // in seconds
+
+ @Override
+ public Map<Long, MessageAction> nextAction(Event event, Message msg, List<Message> queue) {
+ long t1 = System.currentTimeMillis();
+ log.info(">>>>> Handler started for message: " + msg + ": " + event);
+
+ PrioritizedMessage pmsg = null;
+ List<PrioritizedMessage> processing = new ArrayList<>();
+ List<PrioritizedMessage> waiting = new ArrayList<>();
+ Date now = new Date();
+ for (Message m : queue) {
+ PrioritizedMessage pm = new PrioritizedMessage();
+ pm.message = m;
+ pm.priority = assignPriority(msg);
+ pm.timeInQueue = getTimeInQueue(now, m);
+ pm.updateGroup = determineUpdateGroup(m);
+ pm.updateSequence = determineUpdateSequence(m);
+
+ if (pm.message.messageId == msg.messageId) {
+ pmsg = pm;
+
+ if (event != Event.COMPLETED) {
+ waiting.add(pm);
+ }
+ } else {
+ MessageStatusValue s = m.statusHistory.get(0).status;
+ if (s == MessageStatusValue.IN_QUEUE) {
+ waiting.add(pm);
+ } else if (s == MessageStatusValue.PROCESSING_SYNC || s == MessageStatusValue.PROCESSING_ASYNC) {
+ processing.add(pm);
+ }
+ }
+ }
+
+ log.info(" Processing:");
+ for (PrioritizedMessage pm : processing) {
+ log.info(" -- " + pm.message + " | " + pm.priority + " | " + pm.timeInQueue + " | " + pm.updateGroup + " | " + pm.updateSequence);
+ }
+ log.info(" Waiting:");
+ for (PrioritizedMessage pm : waiting) {
+ log.info(" -- " + pm.message + " | " + pm.priority + " | " + pm.timeInQueue + " | " + pm.updateGroup + " | " + pm.updateSequence);
+ }
+
+ log.info(" Determined actions:");
+
+ Collections.sort(waiting, (pm1, pm2) -> comparePrioritizedMessages(pm1, pm2));
+
+ Map<Long, MessageAction> mm = new HashMap<>();
+
+ List<PrioritizedMessage> skipList = findMessagesToSkip(processing, waiting);
+ addNextActionComplete(mm, skipList, Resolution.SKIPPED, now);
+
+ waiting.removeAll(skipList);
+
+ List<PrioritizedMessage> expiredList = findExpiredMessages(processing, waiting);
+ addNextActionComplete(mm, expiredList, Resolution.EXPIRED, now);
+
+ waiting.removeAll(expiredList);
+
+ List<PrioritizedMessage> dropList = findMessagesToDrop(processing, waiting);
+ addNextActionComplete(mm, dropList, Resolution.DROPPED, now);
+
+ waiting.removeAll(dropList);
+
+ List<PrioritizedMessage> processList = findMessagesToProcess(processing, waiting);
+ for (PrioritizedMessage pm : processList) {
+ MessageAction a = new MessageAction();
+ a.timestamp = now;
+ if (async) {
+ a.action = MessageActionValue.RETURN_PROCESS;
+ a.returnResponse = ackResponse(pm.message);
+ } else {
+ a.action = MessageActionValue.PROCESS;
+ }
+ mm.put(pm.message.messageId, a);
+ log.info(" -- Next action for message: " + pm.message + ": " + a.action);
+ }
+
+ if (event != Event.COMPLETED && !mm.containsKey(pmsg.message.messageId)) {
+ MessageAction a = new MessageAction();
+ a.timestamp = now;
+ a.holdTime = pmsg.updateGroup != null ? updateWaitTime : maxTimeInQueue;
+ if (async) {
+ a.action = MessageActionValue.RETURN_HOLD;
+ a.returnResponse = ackResponse(pmsg.message);
+ } else {
+ a.action = MessageActionValue.HOLD;
+ }
+ mm.put(pmsg.message.messageId, a);
+ log.info(" -- Next action for message: " + pmsg.message + ": " + a.action + ": " + a.holdTime);
+
+ waiting.remove(pmsg);
+ }
+
+ if (event == Event.COMPLETED) {
+ MessageAction a = new MessageAction();
+ a.timestamp = now;
+ a.action = MessageActionValue.RETURN_COMPLETE;
+ a.resolution = Resolution.PROCESSED.toString();
+ a.returnResponse = completeResponse(Resolution.PROCESSED, msg);
+ mm.put(pmsg.message.messageId, a);
+ log.info(" -- Next action for message: " + pmsg.message + ": " + a.action + ": " + a.resolution);
+ }
+
+ long t2 = System.currentTimeMillis();
+ log.info("<<<<< Handler completed for message: " + msg + ": " + event + ": Time: " + (t2 - t1));
+ return mm;
+ }
+
+ private void addNextActionComplete(Map<Long, MessageAction> mm, List<PrioritizedMessage> skipList, Resolution r, Date now) {
+ for (PrioritizedMessage pm : skipList) {
+ MessageAction a = new MessageAction();
+ a.action = MessageActionValue.RETURN_COMPLETE;
+ a.resolution = r.toString();
+ a.timestamp = now;
+ a.returnResponse = completeResponse(r, pm.message);
+ mm.put(pm.message.messageId, a);
+ log.info(" -- Next action for message: " + pm.message + ": " + a.action + ": " + a.resolution);
+ }
+ }
+
+ protected List<PrioritizedMessage> findMessagesToSkip(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) {
+ List<PrioritizedMessage> ll = new ArrayList<>();
+ Map<String, PrioritizedMessage> lastMap = new HashMap<>();
+ for (PrioritizedMessage pm : waiting) {
+ if (pm.updateGroup != null) {
+ PrioritizedMessage last = lastMap.get(pm.updateGroup);
+ if (last == null) {
+ lastMap.put(pm.updateGroup, pm);
+ } else {
+ if (pm.updateSequence > last.updateSequence) {
+ ll.add(last);
+ lastMap.put(pm.updateGroup, pm);
+ }
+ }
+ }
+ }
+ return ll;
+ }
+
+ protected List<PrioritizedMessage> findExpiredMessages(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) {
+ List<PrioritizedMessage> ll = new ArrayList<>();
+ if (timeToLive > 0) {
+ for (PrioritizedMessage pm : waiting) {
+ if (pm.timeInQueue > timeToLive) {
+ ll.add(pm);
+ }
+ }
+ }
+ return ll;
+ }
+
+ protected List<PrioritizedMessage> findMessagesToDrop(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) {
+ List<PrioritizedMessage> ll = new ArrayList<>();
+ if (maxQueueSize > 0) {
+ // Drop the least important messages (last in the prioritized waiting list)
+ for (int i = maxQueueSize; i < waiting.size(); i++) {
+ ll.add(waiting.get(i));
+ }
+ }
+ return ll;
+ }
+
+ protected List<PrioritizedMessage> findMessagesToProcess(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) {
+ List<PrioritizedMessage> ll = new ArrayList<>();
+
+ if (processing.size() >= maxParallelCount) {
+ return ll;
+ }
+
+ // Find messages allowed to be processed based on the concurrency rules
+
+ List<List<String>> currentLocks = new ArrayList<>();
+ for (PrioritizedMessage m : processing) {
+ List<List<String>> locks = determineConcurency(m.message);
+ if (locks != null) {
+ currentLocks.addAll(locks);
+ }
+ }
+
+ List<PrioritizedMessage> allowed = new ArrayList<>();
+ for (PrioritizedMessage m : waiting) {
+ List<List<String>> neededLocks = determineConcurency(m.message);
+ if (allowed(currentLocks, neededLocks)) {
+ allowed.add(m);
+ }
+ }
+
+ // Remove messages that are waiting on hold
+ Iterator<PrioritizedMessage> ii = allowed.iterator();
+ while (ii.hasNext()) {
+ PrioritizedMessage pm = ii.next();
+ if (pm.updateGroup != null) {
+ log.info(" --- Check: " + pm.message + ": " + pm.timeInQueue + " >= " + updateWaitTime);
+ if (pm.timeInQueue < updateWaitTime) {
+ ii.remove();
+ }
+ }
+ }
+
+ // Limit the number of processing messages to maxParallelCount
+
+ for (int i = 0; i < allowed.size() && i < maxParallelCount - processing.size(); i++) {
+ ll.add(allowed.get(i));
+ }
+
+ return ll;
+ }
+
+ private int comparePrioritizedMessages(PrioritizedMessage pm1, PrioritizedMessage pm2) {
+ if (pm1.timeInQueue >= maxTimeInQueue && pm2.timeInQueue < maxTimeInQueue) {
+ return -1;
+ }
+ if (pm1.timeInQueue < maxTimeInQueue && pm2.timeInQueue >= maxTimeInQueue) {
+ return 1;
+ }
+ if (pm1.priority < pm2.priority) {
+ return -1;
+ }
+ if (pm1.priority > pm2.priority) {
+ return 1;
+ }
+ if (pm1.timeInQueue > pm2.timeInQueue) {
+ return -1;
+ }
+ if (pm1.timeInQueue < pm2.timeInQueue) {
+ return 1;
+ }
+ return 0;
+ }
+
+ private int getTimeInQueue(Date now, Message m) {
+ // Find the elapsed time since message arrived. That will be the timestamp of the first
+ // status of the message (last status in the status history list)
+ MessageStatus receivedStatus = m.statusHistory.get(m.statusHistory.size() - 1);
+ return (int) ((now.getTime() - receivedStatus.timestamp.getTime()) / 1000);
+ }
+
+ private boolean allowed(List<List<String>> currentLocks, List<List<String>> neededLocks) {
+ if (neededLocks == null) {
+ return true;
+ }
+ for (List<String> neededLockLevels : neededLocks) {
+ for (List<String> currentLockLevels : currentLocks) {
+ int n = neededLockLevels.size() < currentLockLevels.size() ? neededLockLevels.size() : currentLockLevels.size();
+ boolean good = false;
+ for (int i = 0; i < n; i++) {
+ if (!neededLockLevels.get(i).equals(currentLockLevels.get(i))) {
+ good = true;
+ break;
+ }
+ }
+ if (!good) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ protected void initMessage(Message msg) {
+ }
+
+ protected void closeMessage(Message msg) {
+ }
+
+ protected long assignPriority(Message msg) {
+ return msg.messageId; // FIFO by default
+ }
+
+ protected String determineUpdateGroup(Message msg) {
+ return null;
+ }
+
+ protected long determineUpdateSequence(Message msg) {
+ return msg.messageId;
+ }
+
+ protected List<List<String>> determineConcurency(Message msg) {
+ return null;
+ }
+
+ protected MessageData ackResponse(Message msg) {
+ return null;
+ }
+
+ protected MessageData completeResponse(Resolution r, Message msg) {
+ return null;
+ }
+
+ protected static enum Resolution {
+ PROCESSED, SKIPPED, EXPIRED, DROPPED
+ }
+
+ protected static class PrioritizedMessage {
+
+ public Message message;
+ public long priority;
+ public int timeInQueue;
+ public String updateGroup;
+ public long updateSequence;
+ }
+
+ public void setMaxParallelCount(int maxParallelCount) {
+ this.maxParallelCount = maxParallelCount;
+ }
+
+ public void setTimeToLive(int timeToLive) {
+ this.timeToLive = timeToLive;
+ }
+
+ public void setMaxTimeInQueue(int maxTimeInQueue) {
+ this.maxTimeInQueue = maxTimeInQueue;
+ }
+
+ public void setUpdateWaitTime(int updateWaitTime) {
+ this.updateWaitTime = updateWaitTime;
+ }
+
+ public void setMaxQueueSize(int maxQueueSize) {
+ this.maxQueueSize = maxQueueSize;
+ }
+
+ public void setAsync(boolean async) {
+ this.async = async;
+ }
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java
new file mode 100644
index 000000000..61059e067
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java
@@ -0,0 +1,57 @@
+package org.onap.ccsdk.features.lib.doorman.impl;
+
+import java.util.Map;
+import org.onap.ccsdk.features.lib.doorman.MessageClassifier;
+import org.onap.ccsdk.features.lib.doorman.MessageInterceptor;
+import org.onap.ccsdk.features.lib.doorman.MessageInterceptorFactory;
+import org.onap.ccsdk.features.lib.doorman.MessageProcessor;
+import org.onap.ccsdk.features.lib.doorman.MessageQueueHandler;
+import org.onap.ccsdk.features.lib.doorman.dao.MessageDao;
+import org.onap.ccsdk.features.lib.rlock.LockHelper;
+
+public class MessageInterceptorFactoryImpl implements MessageInterceptorFactory {
+
+ private MessageClassifier messageClassifier;
+ private Map<String, MessageQueueHandler> handlerMap;
+ private MessageProcessor messageProcessor;
+ private MessageDao messageDao;
+
+ private LockHelper lockHelper;
+ private int lockTimeout; // in seconds
+
+ @Override
+ public MessageInterceptor create() {
+ MessageInterceptorImpl mi = new MessageInterceptorImpl();
+ mi.setMessageClassifier(messageClassifier);
+ mi.setHandlerMap(handlerMap);
+ mi.setMessageProcessor(messageProcessor);
+ mi.setMessageDao(messageDao);
+ mi.setLockHelper(lockHelper);
+ mi.setLockTimeout(lockTimeout);
+ return mi;
+ }
+
+ public void setMessageDao(MessageDao messageDao) {
+ this.messageDao = messageDao;
+ }
+
+ public void setLockHelper(LockHelper lockHelper) {
+ this.lockHelper = lockHelper;
+ }
+
+ public void setLockTimeout(int lockTimeout) {
+ this.lockTimeout = lockTimeout;
+ }
+
+ public void setMessageClassifier(MessageClassifier messageClassifier) {
+ this.messageClassifier = messageClassifier;
+ }
+
+ public void setHandlerMap(Map<String, MessageQueueHandler> handlerMap) {
+ this.handlerMap = handlerMap;
+ }
+
+ public void setMessageProcessor(MessageProcessor messageProcessor) {
+ this.messageProcessor = messageProcessor;
+ }
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java
new file mode 100644
index 000000000..d2ab97101
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java
@@ -0,0 +1,336 @@
+package org.onap.ccsdk.features.lib.doorman.impl;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.onap.ccsdk.features.lib.doorman.MessageClassifier;
+import org.onap.ccsdk.features.lib.doorman.MessageInterceptor;
+import org.onap.ccsdk.features.lib.doorman.MessageProcessor;
+import org.onap.ccsdk.features.lib.doorman.MessageQueueHandler;
+import org.onap.ccsdk.features.lib.doorman.dao.MessageDao;
+import org.onap.ccsdk.features.lib.doorman.data.ActionStatus;
+import org.onap.ccsdk.features.lib.doorman.data.Event;
+import org.onap.ccsdk.features.lib.doorman.data.Message;
+import org.onap.ccsdk.features.lib.doorman.data.MessageAction;
+import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue;
+import org.onap.ccsdk.features.lib.doorman.data.MessageData;
+import org.onap.ccsdk.features.lib.doorman.data.MessageStatus;
+import org.onap.ccsdk.features.lib.doorman.data.MessageStatusValue;
+import org.onap.ccsdk.features.lib.doorman.data.Queue;
+import org.onap.ccsdk.features.lib.rlock.LockHelper;
+import org.onap.ccsdk.features.lib.rlock.SynchronizedFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageInterceptorImpl implements MessageInterceptor {
+
+ private static final Logger log = LoggerFactory.getLogger(MessageInterceptorImpl.class);
+
+ private MessageClassifier messageClassifier;
+ private Map<String, MessageQueueHandler> handlerMap;
+ private MessageProcessor messageProcessor;
+ private MessageDao messageDao;
+
+ private LockHelper lockHelper;
+ private int lockTimeout = 10; // in seconds
+
+ private Message message = null;
+ private MessageQueueHandler handler = null;
+
+ @Override
+ public MessageData processRequest(MessageData request) {
+ Date now = new Date();
+
+ Queue q = null;
+ String extMessageId = null;
+ if (messageClassifier != null) {
+ q = messageClassifier.determineQueue(request);
+ extMessageId = messageClassifier.getExtMessageId(request);
+ }
+
+ long id = messageDao.addArrivedMessage(extMessageId, request, q, now);
+
+ MessageStatus arrivedStatus = new MessageStatus();
+ arrivedStatus.status = MessageStatusValue.ARRIVED;
+ arrivedStatus.timestamp = now;
+ messageDao.addStatus(id, arrivedStatus);
+
+ message = new Message();
+ message.messageId = id;
+ message.request = request;
+ message.arrivedTimestamp = now;
+ message.queue = q;
+ message.extMessageId = extMessageId;
+
+ log.info("Message received: " + message);
+
+ if (q != null && handlerMap != null) {
+ handler = handlerMap.get(q.type);
+ }
+
+ if (q == null || handler == null) {
+ processSync();
+ return null; // Do nothing, normal message processing
+ }
+
+ Event event = Event.ARRIVED;
+
+ while (true) {
+ MessageFunction func = new MessageFunction(event);
+ func.exec();
+
+ MessageAction nextAction = func.getNextAction();
+ if (nextAction == null) {
+ processSync();
+ return null;
+ }
+
+ switch (nextAction.action) {
+
+ case PROCESS:
+ processSync();
+ return null;
+
+ case HOLD: {
+ event = waitForNewAction(nextAction.holdTime);
+ break;
+ }
+
+ case RETURN_COMPLETE:
+ returnComplete(nextAction);
+ return nextAction.returnResponse;
+
+ case RETURN_PROCESS:
+ processAsync(nextAction.returnResponse);
+ return nextAction.returnResponse;
+
+ case RETURN_HOLD: {
+ returnHold(nextAction);
+ return nextAction.returnResponse;
+ }
+ }
+ }
+ }
+
+ private void processSync() {
+ messageDao.updateMessageStarted(message.messageId, new Date());
+ log.info("Message processing started: " + message);
+ }
+
+ private void processAsync(MessageData returnResponse) {
+ Thread t = new Thread(() -> processMessage(message.request), message.queue.type + "::" + message.queue.id + "::" + message.messageId);
+ t.start();
+
+ messageDao.updateMessageResponse(message.messageId, new Date(), returnResponse);
+ }
+
+ private void processMessage(MessageData request) {
+ messageDao.updateMessageStarted(message.messageId, new Date());
+ log.info("Message processing started: " + message);
+ if (messageProcessor != null) {
+ messageProcessor.processMessage(request);
+ }
+
+ MessageFunction func = new MessageFunction(Event.COMPLETED);
+ func.exec();
+ MessageAction nextAction = func.getNextAction();
+
+ messageDao.updateMessageCompleted(message.messageId, nextAction.resolution, new Date());
+ log.info("Message processing completed: " + message);
+ }
+
+ private void returnComplete(MessageAction nextAction) {
+ Date now = new Date();
+ messageDao.updateMessageResponse(message.messageId, now, nextAction.returnResponse);
+ messageDao.updateMessageCompleted(message.messageId, nextAction.resolution, now);
+ log.info("Message processing completed: " + message);
+ }
+
+ private void returnHold(MessageAction nextAction) {
+ Thread t = new Thread(() -> asyncQueue(nextAction), message.queue.type + "::" + message.queue.id + "::" + message.messageId);
+ t.start();
+ messageDao.updateMessageResponse(message.messageId, new Date(), nextAction.returnResponse);
+ }
+
+ private void asyncQueue(MessageAction nextAction) {
+ Event event = waitForNewAction(nextAction.holdTime);
+
+ while (true) {
+ MessageFunction func = new MessageFunction(event);
+ func.exec();
+
+ nextAction = func.getNextAction();
+ if (nextAction == null) {
+ processMessage(message.request);
+ return;
+ }
+
+ switch (nextAction.action) {
+ case PROCESS:
+ processMessage(message.request);
+ return;
+
+ case HOLD: {
+ event = waitForNewAction(nextAction.holdTime);
+ break;
+ }
+
+ default:
+ return;
+ }
+ }
+ }
+
+ private Event waitForNewAction(int holdTime) {
+ long startTime = System.currentTimeMillis();
+ long currentTime = startTime;
+ while (currentTime - startTime <= (holdTime + 1) * 1000) {
+ try {
+ Thread.sleep(5000);
+ } catch (Exception e) {
+ }
+
+ MessageAction nextAction = messageDao.getNextAction(message.messageId);
+ if (nextAction != null && nextAction.action != MessageActionValue.HOLD) {
+ return Event.AWAKEN;
+ }
+
+ currentTime = System.currentTimeMillis();
+ }
+ return Event.CHECK;
+ }
+
+ @Override
+ public void processResponse(MessageData response) {
+ if (message == null) {
+ return;
+ }
+
+ String resolution = null;
+ if (message.queue != null && handler != null) {
+ MessageFunction func = new MessageFunction(Event.COMPLETED);
+ func.exec();
+ MessageAction nextAction = func.getNextAction();
+ if (nextAction != null) {
+ resolution = nextAction.resolution;
+ }
+ }
+
+ Date now = new Date();
+ messageDao.updateMessageResponse(message.messageId, now, response);
+ messageDao.updateMessageCompleted(message.messageId, resolution, now);
+ log.info("Message processing completed: " + message);
+ }
+
+ private class MessageFunction extends SynchronizedFunction {
+
+ private Event event;
+
+ private MessageAction nextAction = null; // Output
+
+ public MessageFunction(Event event) {
+ super(lockHelper, Collections.singleton(message.queue.type + "::" + message.queue.id), lockTimeout);
+ this.event = event;
+ }
+
+ @Override
+ protected void _exec() {
+ List<Message> messageQueue = messageDao.readMessageQueue(message.queue);
+ if (event == Event.AWAKEN) {
+ for (Message m : messageQueue) {
+ if (m.messageId == message.messageId) {
+ nextAction = m.actionHistory.get(0);
+ }
+ }
+ if (nextAction != null) {
+ messageDao.updateActionDone(nextAction.actionId, new Date());
+ }
+ } else {
+ Map<Long, MessageAction> nextActionMap = handler.nextAction(event, message, messageQueue);
+ if (nextActionMap != null) {
+ for (Message m : messageQueue) {
+ MessageAction action = nextActionMap.get(m.messageId);
+ if (action != null) {
+ if (m.messageId == message.messageId) {
+ action.actionStatus = ActionStatus.DONE;
+ action.doneTimestamp = new Date();
+ messageDao.addAction(m.messageId, action);
+ nextAction = action;
+ } else {
+ MessageAction lastAction = m.actionHistory.get(0);
+ if (lastAction.actionStatus != ActionStatus.PENDING || !action.same(lastAction)) {
+ action.actionStatus = ActionStatus.PENDING;
+ messageDao.addAction(m.messageId, action);
+ }
+ }
+ }
+ }
+ }
+ }
+ if (nextAction != null) {
+ log.info("Next message action: " + message + ":" + nextAction.action);
+ MessageStatus status = determineStatus(nextAction);
+ if (status != null) {
+ messageDao.addStatus(message.messageId, status);
+ log.info("Updating message status: " + message + ":" + status.status);
+ }
+ }
+ }
+
+ public MessageAction getNextAction() {
+ return nextAction;
+ }
+ }
+
+ private MessageStatus determineStatus(MessageAction action) {
+ if (action == null) {
+ return null;
+ }
+
+ MessageStatus s = new MessageStatus();
+ s.timestamp = new Date();
+
+ switch (action.action) {
+ case PROCESS:
+ s.status = MessageStatusValue.PROCESSING_SYNC;
+ break;
+ case HOLD:
+ case RETURN_HOLD:
+ s.status = MessageStatusValue.IN_QUEUE;
+ break;
+ case RETURN_PROCESS:
+ s.status = MessageStatusValue.PROCESSING_ASYNC;
+ break;
+ case RETURN_COMPLETE:
+ s.status = MessageStatusValue.COMPLETED;
+ break;
+ }
+
+ return s;
+ }
+
+ public void setMessageDao(MessageDao messageDao) {
+ this.messageDao = messageDao;
+ }
+
+ public void setLockHelper(LockHelper lockHelper) {
+ this.lockHelper = lockHelper;
+ }
+
+ public void setLockTimeout(int lockTimeout) {
+ this.lockTimeout = lockTimeout;
+ }
+
+ public void setMessageClassifier(MessageClassifier messageClassifier) {
+ this.messageClassifier = messageClassifier;
+ }
+
+ public void setHandlerMap(Map<String, MessageQueueHandler> handlerMap) {
+ this.handlerMap = handlerMap;
+ }
+
+ public void setMessageProcessor(MessageProcessor messageProcessor) {
+ this.messageProcessor = messageProcessor;
+ }
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java
new file mode 100644
index 000000000..73c3d8b6a
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java
@@ -0,0 +1,236 @@
+package org.onap.ccsdk.features.lib.doorman.servlet;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.CharArrayWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletRequestWrapper;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpServletResponseWrapper;
+import org.onap.ccsdk.features.lib.doorman.MessageInterceptor;
+import org.onap.ccsdk.features.lib.doorman.MessageInterceptorFactory;
+import org.onap.ccsdk.features.lib.doorman.data.MessageData;
+
+public class MessageInterceptorFilter implements Filter {
+
+ private MessageInterceptorFactory messageInterceptorFactory;
+
+ @Override
+ public void init(FilterConfig filterConfig) throws ServletException {
+ }
+
+ @Override
+ public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
+ RequestWrapper req = new RequestWrapper((HttpServletRequest) request);
+ MessageData requestData = getMessageRequest(req);
+
+ MessageInterceptor interceptor = messageInterceptorFactory.create();
+ MessageData responseData = interceptor.processRequest(requestData);
+
+ if (responseData == null) {
+ ResponseWrapper res = new ResponseWrapper((HttpServletResponse) response);
+
+ chain.doFilter(req, res);
+
+ responseData = res.getMessageResponse();
+ interceptor.processResponse(responseData);
+ }
+
+ setMessageResponse((HttpServletResponse) response, responseData);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void setMessageResponse(HttpServletResponse response, MessageData responseData) throws IOException {
+ if (responseData.param != null) {
+ String contentType = (String) responseData.param.get("content_type");
+ if (contentType != null) {
+ response.setContentType(contentType);
+ }
+ Integer httpCode = (Integer) responseData.param.get("http_code");
+ if (httpCode != null) {
+ response.setStatus(httpCode);
+ }
+ Map<String, Object> headers = (Map<String, Object>) responseData.param.get("headers");
+ if (headers != null) {
+ for (Entry<String, Object> entry : headers.entrySet()) {
+ String name = entry.getKey();
+ Object v = entry.getValue();
+ if (v instanceof List) {
+ List<Object> ll = (List<Object>) v;
+ for (Object o : ll) {
+ response.addHeader(name, o.toString());
+ }
+ } else {
+ response.setHeader(name, v.toString());
+ }
+ }
+ }
+ }
+
+ if (responseData.body != null) {
+ response.setContentLength(responseData.body.length());
+ response.getWriter().write(responseData.body);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private MessageData getMessageRequest(RequestWrapper request) {
+ MessageData requestData = new MessageData();
+ requestData.param = new HashMap<>();
+ requestData.param.put("http_method", request.getMethod());
+ requestData.param.put("uri", request.getPathInfo());
+ requestData.param.put("param", request.getParameterMap());
+ requestData.param.put("content_type", request.getContentType());
+
+ Map<String, Object> headers = new HashMap<>();
+ Enumeration<String> headerNames = request.getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ String name = headerNames.nextElement();
+ Enumeration<String> values = request.getHeaders(name);
+ List<String> valueList = new ArrayList<>();
+ while (values.hasMoreElements()) {
+ valueList.add(values.nextElement());
+ }
+ if (valueList.size() > 1) {
+ headers.put(name, valueList);
+ } else {
+ headers.put(name, valueList.get(0));
+ }
+ }
+ requestData.param.put("headers", headers);
+
+ requestData.body = request.getBody();
+
+ return requestData;
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+ private static class RequestWrapper extends HttpServletRequestWrapper {
+
+ private final String body;
+
+ public RequestWrapper(HttpServletRequest request) throws IOException {
+ super(request);
+
+ StringBuilder stringBuilder = new StringBuilder();
+ try (InputStream inputStream = request.getInputStream()) {
+ if (inputStream != null) {
+ try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) {
+ char[] charBuffer = new char[128];
+ int bytesRead = -1;
+ while ((bytesRead = bufferedReader.read(charBuffer)) > 0) {
+ stringBuilder.append(charBuffer, 0, bytesRead);
+ }
+ }
+ }
+ }
+ body = stringBuilder.toString();
+ }
+
+ @Override
+ public ServletInputStream getInputStream() throws IOException {
+ final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body.getBytes());
+ ServletInputStream servletInputStream = new ServletInputStream() {
+
+ @Override
+ public int read() throws IOException {
+ return byteArrayInputStream.read();
+ }
+ };
+ return servletInputStream;
+ }
+
+ @Override
+ public BufferedReader getReader() throws IOException {
+ return new BufferedReader(new InputStreamReader(getInputStream()));
+ }
+
+ public String getBody() {
+ return body;
+ }
+ }
+
+ public class ResponseWrapper extends HttpServletResponseWrapper {
+
+ private CharArrayWriter writer = new CharArrayWriter();
+ private Map<String, Object> param = new HashMap<>();
+
+ public ResponseWrapper(HttpServletResponse response) {
+ super(response);
+ }
+
+ @Override
+ public PrintWriter getWriter() {
+ return new PrintWriter(writer);
+ }
+
+ @Override
+ public void setStatus(int status) {
+ param.put("http_code", status);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setHeader(String name, String value) {
+ Map<String, Object> headers = (Map<String, Object>) param.get("headers");
+ if (headers == null) {
+ headers = new HashMap<>();
+ param.put("headers", headers);
+ }
+ headers.put(name, value);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void addHeader(String name, String value) {
+ Map<String, Object> headers = (Map<String, Object>) param.get("headers");
+ if (headers == null) {
+ headers = new HashMap<>();
+ param.put("headers", headers);
+ }
+ Object v = headers.get(name);
+ if (v == null) {
+ headers.put(name, value);
+ } else if (v instanceof List) {
+ ((List<Object>) v).add(value);
+ } else {
+ List<Object> ll = new ArrayList<>();
+ ll.add(v);
+ ll.add(value);
+ headers.put(name, ll);
+ }
+ }
+
+ public MessageData getMessageResponse() {
+ MessageData responseData = new MessageData();
+ responseData.param = param;
+ responseData.body = writer.toString();
+ return responseData;
+ }
+ }
+
+ public void setMessageInterceptorFactory(MessageInterceptorFactory messageInterceptorFactory) {
+ this.messageInterceptorFactory = messageInterceptorFactory;
+ }
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java
new file mode 100644
index 000000000..42f1b21e7
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java
@@ -0,0 +1,273 @@
+package org.onap.ccsdk.features.lib.doorman.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataUtil {
+
+ @SuppressWarnings("unused")
+ private static final Logger log = LoggerFactory.getLogger(DataUtil.class);
+
+ @SuppressWarnings("unchecked")
+ public static Object get(Object struct, String compositeKey) {
+ if (struct == null || compositeKey == null) {
+ return null;
+ }
+
+ List<Object> keys = splitCompositeKey(compositeKey);
+ Object currentValue = struct;
+ String currentKey = "";
+
+ for (Object key : keys) {
+ if (key instanceof Integer) {
+ if (!(currentValue instanceof List)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list");
+ }
+
+ Integer keyi = (Integer) key;
+ List<Object> currentValueL = (List<Object>) currentValue;
+
+ if (keyi >= currentValueL.size()) {
+ return null;
+ }
+
+ currentValue = currentValueL.get(keyi);
+ if (currentValue == null) {
+ return null;
+ }
+
+ currentKey += "[" + key + "]";
+ } else {
+ if (!(currentValue instanceof Map)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map");
+ }
+
+ currentValue = ((Map<String, Object>) currentValue).get(key);
+ if (currentValue == null) {
+ return null;
+ }
+
+ currentKey += "." + key;
+ }
+ }
+ return currentValue;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void set(Object struct, String compositeKey, Object value) {
+ if (struct == null) {
+ throw new IllegalArgumentException("Null argument: struct");
+ }
+
+ if (compositeKey == null || compositeKey.length() == 0) {
+ throw new IllegalArgumentException("Null or empty argument: compositeKey");
+ }
+
+ if (value == null) {
+ return;
+ }
+
+ List<Object> keys = splitCompositeKey(compositeKey);
+ Object currentValue = struct;
+ String currentKey = "";
+
+ for (int i = 0; i < keys.size() - 1; i++) {
+ Object key = keys.get(i);
+
+ if (key instanceof Integer) {
+ if (!(currentValue instanceof List)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list");
+ }
+
+ Integer keyi = (Integer) key;
+ List<Object> currentValueL = (List<Object>) currentValue;
+ int size = currentValueL.size();
+
+ if (keyi >= size) {
+ for (int k = 0; k < keyi - size + 1; k++) {
+ currentValueL.add(null);
+ }
+ }
+
+ Object newValue = currentValueL.get(keyi);
+ if (newValue == null) {
+ Object nextKey = keys.get(i + 1);
+ if (nextKey instanceof Integer) {
+ newValue = new ArrayList<>();
+ } else {
+ newValue = new HashMap<>();
+ }
+ currentValueL.set(keyi, newValue);
+ }
+
+ currentValue = newValue;
+ currentKey += "[" + key + "]";
+
+ } else {
+ if (!(currentValue instanceof Map)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map");
+ }
+
+ Object newValue = ((Map<String, Object>) currentValue).get(key);
+ if (newValue == null) {
+ Object nextKey = keys.get(i + 1);
+ if (nextKey instanceof Integer) {
+ newValue = new ArrayList<>();
+ } else {
+ newValue = new HashMap<>();
+ }
+ ((Map<String, Object>) currentValue).put((String) key, newValue);
+ }
+
+ currentValue = newValue;
+ currentKey += "." + key;
+ }
+ }
+
+ Object key = keys.get(keys.size() - 1);
+ if (key instanceof Integer) {
+ if (!(currentValue instanceof List)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list");
+ }
+
+ Integer keyi = (Integer) key;
+ List<Object> currentValueL = (List<Object>) currentValue;
+ int size = currentValueL.size();
+
+ if (keyi >= size) {
+ for (int k = 0; k < keyi - size + 1; k++) {
+ currentValueL.add(null);
+ }
+ }
+
+ currentValueL.set(keyi, value);
+
+ } else {
+ if (!(currentValue instanceof Map)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map");
+ }
+
+ ((Map<String, Object>) currentValue).put((String) key, value);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void delete(Object struct, String compositeKey) {
+ if (struct == null) {
+ throw new IllegalArgumentException("Null argument: struct");
+ }
+
+ if (compositeKey == null) {
+ throw new IllegalArgumentException("Null argument: compositeKey");
+ }
+
+ List<Object> keys = splitCompositeKey(compositeKey);
+ Object currentValue = struct;
+ String currentKey = "";
+
+ for (int i = 0; i < keys.size() - 1; i++) {
+ Object key = keys.get(i);
+
+ if (key instanceof Integer) {
+ if (!(currentValue instanceof List)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list");
+ }
+
+ Integer keyi = (Integer) key;
+ List<Object> currentValueL = (List<Object>) currentValue;
+
+ if (keyi >= currentValueL.size()) {
+ return;
+ }
+
+ currentValue = currentValueL.get(keyi);
+ if (currentValue == null) {
+ return;
+ }
+
+ currentKey += "[" + key + "]";
+
+ } else {
+ if (!(currentValue instanceof Map)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map");
+ }
+
+ currentValue = ((Map<String, Object>) currentValue).get(key);
+ if (currentValue == null) {
+ return;
+ }
+
+ currentKey += "." + key;
+ }
+ }
+
+ Object key = keys.get(keys.size() - 1);
+ if (key instanceof Integer) {
+ if (!(currentValue instanceof List)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list");
+ }
+
+ Integer keyi = (Integer) key;
+ List<Object> currentValueL = (List<Object>) currentValue;
+
+ if (keyi < currentValueL.size()) {
+ currentValueL.remove(keyi);
+ }
+
+ } else {
+ if (!(currentValue instanceof Map)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map");
+ }
+
+ ((Map<String, Object>) currentValue).remove(key);
+ }
+ }
+
+ private static List<Object> splitCompositeKey(String compositeKey) {
+ if (compositeKey == null) {
+ return Collections.emptyList();
+ }
+
+ String[] ss = compositeKey.split("\\.");
+ List<Object> ll = new ArrayList<>();
+ for (String s : ss) {
+ if (s.length() == 0) {
+ continue;
+ }
+
+ int i1 = s.indexOf('[');
+ if (i1 < 0) {
+ ll.add(s);
+ } else {
+ if (!s.endsWith("]")) {
+ throw new IllegalArgumentException("Invalid composite key: " + compositeKey + ": No matching ] found");
+ }
+
+ String s1 = s.substring(0, i1);
+ if (s1.length() > 0) {
+ ll.add(s1);
+ }
+
+ String s2 = s.substring(i1 + 1, s.length() - 1);
+ try {
+ int n = Integer.parseInt(s2);
+ if (n < 0) {
+ throw new IllegalArgumentException("Invalid composite key: " + compositeKey + ": Index must be >= 0: " + n);
+ }
+
+ ll.add(n);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid composite key: " + compositeKey + ": Index not a number: " + s2);
+ }
+ }
+ }
+
+ return ll;
+ }
+}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java
new file mode 100644
index 000000000..fc9ce0936
--- /dev/null
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java
@@ -0,0 +1,319 @@
+package org.onap.ccsdk.features.lib.doorman.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class JsonUtil {
+
+ public static Object jsonToData(String s) {
+ if (s == null) {
+ return null;
+ }
+ return jsonToData(new Current(s));
+ }
+
+ private static class Current {
+
+ public String s;
+ public int i;
+ public int line, pos;
+
+ public Current(String s) {
+ this.s = s;
+ i = 0;
+ line = 1;
+ pos = 1;
+ }
+
+ public void move() {
+ i++;
+ pos++;
+ }
+
+ public void move(int k) {
+ i += k;
+ pos += k;
+ }
+
+ public boolean end() {
+ return i >= s.length();
+ }
+
+ public char get() {
+ return s.charAt(i);
+ }
+
+ public boolean is(String ss) {
+ return i < s.length() - ss.length() && s.substring(i, i + ss.length()).equals(ss);
+ }
+
+ public boolean is(char c) {
+ return i < s.length() && s.charAt(i) == c;
+ }
+
+ public void skipWhiteSpace() {
+ while (i < s.length() && s.charAt(i) <= 32) {
+ char cc = s.charAt(i);
+ if (cc == '\n') {
+ line++;
+ pos = 1;
+ } else if (cc == ' ' || cc == '\t') {
+ pos++;
+ }
+ i++;
+ }
+ }
+ }
+
+ public static Object jsonToData(Current c) {
+ c.skipWhiteSpace();
+
+ if (c.end()) {
+ return "";
+ }
+
+ char cc = c.get();
+ if (cc == '{') {
+ c.move();
+ return jsonToMap(c);
+ }
+ if (cc == '[') {
+ c.move();
+ return jsonToList(c);
+ }
+ return jsonToObject(c);
+ }
+
+ private static Object jsonToObject(Current c) {
+ if (c.is('"')) {
+ c.move();
+ StringBuilder ss = new StringBuilder();
+ while (!c.end() && c.get() != '"') {
+ if (c.get() == '\\') {
+ c.move();
+ char cc = c.get();
+ switch (cc) {
+ case '\\':
+ ss.append('\\');
+ break;
+ case '"':
+ ss.append('\"');
+ break;
+ case 'n':
+ ss.append('\n');
+ break;
+ case 'r':
+ ss.append('\r');
+ break;
+ case 't':
+ ss.append('\t');
+ break;
+ default:
+ throw new IllegalArgumentException("JSON parsing error: Invalid escaped character at (" + c.line + ':' + c.pos + "): " + cc);
+ }
+ } else {
+ ss.append(c.get());
+ }
+ c.move();
+ }
+ if (!c.end()) {
+ c.move(); // Skip the closing "
+ }
+ return ss.toString();
+ }
+ if (c.is("true")) {
+ c.move(4);
+ return true;
+ }
+ if (c.is("false")) {
+ c.move(5);
+ return false;
+ }
+ if (c.is("null")) {
+ c.move(4);
+ return null;
+ }
+ StringBuilder ss = new StringBuilder();
+ while (!c.end() && c.get() != ',' && c.get() != ']' && c.get() != '}' && c.get() != '\n' && c.get() != '\t') {
+ ss.append(c.get());
+ c.move();
+ }
+ try {
+ return Long.valueOf(ss.toString());
+ } catch (Exception e1) {
+ try {
+ return Double.valueOf(ss.toString());
+ } catch (Exception e2) {
+ throw new IllegalArgumentException("JSON parsing error: Invalid value at (" + c.line + ':' + c.pos + "): " + ss.toString());
+ }
+ }
+ }
+
+ private static List<Object> jsonToList(Current c) {
+ List<Object> ll = new ArrayList<>();
+ c.skipWhiteSpace();
+ while (!c.end() && c.get() != ']') {
+ Object value = jsonToData(c);
+
+ ll.add(value);
+
+ c.skipWhiteSpace();
+ if (c.is(',')) {
+ c.move();
+ c.skipWhiteSpace();
+ }
+ }
+
+ if (!c.end()) {
+ c.move(); // Skip the closing ]
+ }
+
+ return ll;
+ }
+
+ private static Map<String, Object> jsonToMap(Current c) {
+ Map<String, Object> mm = new HashMap<>();
+ c.skipWhiteSpace();
+ while (!c.end() && c.get() != '}') {
+ Object key = jsonToObject(c);
+ if (!(key instanceof String)) {
+ throw new IllegalArgumentException("JSON parsing error: Invalid key at (" + c.line + ':' + c.pos + "): " + key);
+ }
+
+ c.skipWhiteSpace();
+ if (!c.is(':')) {
+ throw new IllegalArgumentException("JSON parsing error: Expected ':' at (" + c.line + ':' + c.pos + ")");
+ }
+
+ c.move(); // Skip the :
+ Object value = jsonToData(c);
+
+ mm.put((String) key, value);
+
+ c.skipWhiteSpace();
+ if (c.is(',')) {
+ c.move();
+ c.skipWhiteSpace();
+ }
+ }
+
+ if (!c.end()) {
+ c.move(); // Skip the closing }
+ }
+
+ return mm;
+ }
+
+ public static String dataToJson(Object o, boolean escape) {
+ StringBuilder sb = new StringBuilder();
+ str(sb, o, 0, false, escape);
+ return sb.toString();
+ }
+
+ public static String dataToJson(Object o) {
+ return dataToJson(o, true);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void str(StringBuilder ss, Object o, int indent, boolean padFirst, boolean escape) {
+ if (o == null || o instanceof Boolean || o instanceof Number) {
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ ss.append(o);
+ return;
+ }
+
+ if (o instanceof String) {
+ String s = escape ? escapeJson((String) o) : (String) o;
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ ss.append('"').append(s).append('"');
+ return;
+ }
+
+ if (o instanceof Number || o instanceof Boolean) {
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ ss.append(o);
+ return;
+ }
+
+ if (o instanceof Map) {
+ Map<String, Object> mm = (Map<String, Object>) o;
+
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ ss.append("{\n");
+
+ boolean first = true;
+ for (String k : mm.keySet()) {
+ if (!first) {
+ ss.append(",\n");
+ }
+ first = false;
+
+ Object v = mm.get(k);
+ ss.append(pad(indent + 1));
+ if (escape) {
+ ss.append('"');
+ }
+ ss.append(k);
+ if (escape) {
+ ss.append('"');
+ }
+ ss.append(": ");
+ str(ss, v, indent + 1, false, escape);
+ }
+
+ ss.append("\n");
+ ss.append(pad(indent)).append('}');
+
+ return;
+ }
+
+ if (o instanceof List) {
+ List<Object> ll = (List<Object>) o;
+
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ ss.append("[\n");
+
+ boolean first = true;
+ for (Object o1 : ll) {
+ if (!first) {
+ ss.append(",\n");
+ }
+ first = false;
+
+ str(ss, o1, indent + 1, true, escape);
+ }
+
+ ss.append("\n");
+ ss.append(pad(indent)).append(']');
+ }
+ }
+
+ private static String escapeJson(String v) {
+ String s = v.replaceAll("\\\\", "\\\\\\\\");
+ s = s.replaceAll("\"", "\\\\\"");
+ s = s.replaceAll("\n", "\\\\n");
+ s = s.replaceAll("\r", "\\\\r");
+ s = s.replaceAll("\t", "\\\\t");
+ return s;
+ }
+
+ private static String pad(int n) {
+ String s = "";
+ for (int i = 0; i < n; i++) {
+ s += " ";
+ }
+ return s;
+ }
+}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueDataItem.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueDataItem.java
new file mode 100644
index 000000000..32e94e5d3
--- /dev/null
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueDataItem.java
@@ -0,0 +1,91 @@
+package org.onap.ccsdk.features.lib.doorman.it;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import org.onap.ccsdk.features.lib.doorman.data.MessageAction;
+import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue;
+import org.onap.ccsdk.features.lib.doorman.data.MessageStatus;
+
+public class MessageQueueDataItem implements Comparable<MessageQueueDataItem> {
+
+ public Date timeStamp;
+ public String extMessageId;
+ public MessageStatus status;
+ public MessageAction action;
+
+ @Override
+ public int compareTo(MessageQueueDataItem other) {
+ int c = timeStamp.compareTo(other.timeStamp);
+ if (c == 0) {
+ if (action != null && other.status != null) {
+ return -1;
+ }
+ if (status != null && other.action != null) {
+ return 1;
+ }
+ }
+ return c;
+ }
+
+ @Override
+ public int hashCode() {
+ return System.identityHashCode(extMessageId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || !(o instanceof MessageQueueDataItem)) {
+ return false;
+ }
+ MessageQueueDataItem other = (MessageQueueDataItem) o;
+ if (!extMessageId.equals(other.extMessageId)) {
+ return false;
+ }
+ if (status != null && (other.status == null || status.status != other.status.status)) {
+ return false;
+ }
+ if (action != null) {
+ if (other.action == null || action.action != other.action.action) {
+ return false;
+ }
+ if (action.action == MessageActionValue.HOLD || action.action == MessageActionValue.RETURN_HOLD) {
+ if (action.holdTime != other.action.holdTime) {
+ return false;
+ }
+ } else if (action.action == MessageActionValue.RETURN_COMPLETE) {
+ if (!action.resolution.equals(other.action.resolution)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder ss = new StringBuilder();
+ if (timeStamp != null) {
+ ss.append(df.format(timeStamp)).append(" | ");
+ }
+ if (status != null) {
+ ss.append("STATUS: ");
+ } else {
+ ss.append("ACTION: ");
+ }
+ ss.append(String.format("%-20s | ", extMessageId));
+ if (status != null) {
+ ss.append(status.status);
+ } else {
+ ss.append(action.action);
+ if (action.holdTime > 0) {
+ ss.append(" | ").append(action.holdTime);
+ }
+ if (action.resolution != null) {
+ ss.append(" | ").append(action.resolution);
+ }
+ }
+ return ss.toString();
+ }
+
+ private static SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTest.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTest.java
new file mode 100644
index 000000000..ca1c4910d
--- /dev/null
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTest.java
@@ -0,0 +1,222 @@
+package org.onap.ccsdk.features.lib.doorman.it;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.onap.ccsdk.features.lib.doorman.MessageClassifier;
+import org.onap.ccsdk.features.lib.doorman.MessageInterceptor;
+import org.onap.ccsdk.features.lib.doorman.MessageInterceptorFactory;
+import org.onap.ccsdk.features.lib.doorman.MessageProcessor;
+import org.onap.ccsdk.features.lib.doorman.MessageQueueHandler;
+import org.onap.ccsdk.features.lib.doorman.dao.MessageDaoImpl;
+import org.onap.ccsdk.features.lib.doorman.data.MessageData;
+import org.onap.ccsdk.features.lib.doorman.data.Queue;
+import org.onap.ccsdk.features.lib.doorman.impl.MessageInterceptorFactoryImpl;
+import org.onap.ccsdk.features.lib.doorman.testutil.DbUtil;
+import org.onap.ccsdk.features.lib.doorman.util.JsonUtil;
+import org.onap.ccsdk.features.lib.rlock.LockHelperImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class MessageQueueTest {
+
+ private static final Logger log = LoggerFactory.getLogger(MessageQueueTest.class);
+
+ private String test;
+
+ public MessageQueueTest(String test) {
+ this.test = test;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void test() throws Exception {
+ log.info("#########################################################################");
+ log.info("MessageQueueTest: " + test + " started");
+
+ String testSetupJson = readResource("it/" + test + "/test.json");
+ Map<String, Object> testSetup = (Map<String, Object>) JsonUtil.jsonToData(testSetupJson);
+
+ Map<String, MessageQueueHandler> handlerMap = new HashMap<>();
+
+ for (Object o : (List<Object>) testSetup.get("handler_list")) {
+ Map<String, Object> handlerSetup = (Map<String, Object>) o;
+ String queueType = (String) handlerSetup.get("queue_type");
+ String handlerClassName = (String) handlerSetup.get("handler_class");
+ try {
+ Class<?> handlerClass = Class.forName("org.onap.ccsdk.features.lib.doorman.it." + test + "." + handlerClassName);
+ MessageQueueHandler handler = (MessageQueueHandler) handlerClass.newInstance();
+ handlerMap.put(queueType, handler);
+ log.info("Handler found for queue type: " + queueType + ": " + handlerClass.getName());
+ } catch (Exception e) {
+ }
+ }
+
+ MessageInterceptorFactory factory = setupMessageInterceptorFactory(handlerMap, new Classifier(), new Processor());
+
+ List<Object> requestList = (List<Object>) testSetup.get("request_list");
+
+ List<Thread> threadList = new ArrayList<>();
+
+ for (int i = 0; i < requestList.size(); i++) {
+ Map<String, Object> requestSetup = (Map<String, Object>) requestList.get(i);
+
+ MessageData request = new MessageData();
+ request.param = (Map<String, Object>) requestSetup.get("request_param");
+ Map<String, Object> requestBodyData = (Map<String, Object>) requestSetup.get("request_body");
+ if (requestBodyData != null) {
+ request.body = JsonUtil.dataToJson(requestBodyData);
+ } else {
+ request.body = readResource("it/" + test + "/" + requestSetup.get("request_body_file"));
+ }
+
+ MessageData response = new MessageData();
+ response.param = (Map<String, Object>) requestSetup.get("response_param");
+ Map<String, Object> responseBodyData = (Map<String, Object>) requestSetup.get("response_body");
+ if (responseBodyData != null) {
+ response.body = JsonUtil.dataToJson(responseBodyData);
+ } else {
+ response.body = readResource("it/" + test + "/" + requestSetup.get("response_body_file"));
+ }
+
+ long startTime = (Long) requestSetup.get("start_time");
+ long processTime = (Long) requestSetup.get("process_time");
+
+ MessageInterceptor interceptor = factory.create();
+
+ Thread t = new Thread((Runnable) () -> {
+ try {
+ Thread.sleep(startTime);
+ } catch (InterruptedException e) {
+ }
+
+ MessageData r = interceptor.processRequest(request);
+
+ if (r == null) {
+ try {
+ Thread.sleep(processTime);
+ } catch (InterruptedException e) {
+ }
+
+ interceptor.processResponse(response);
+ }
+
+ }, "Message-" + i);
+
+ threadList.add(t);
+ t.start();
+ }
+
+ for (Thread t : threadList) {
+ t.join();
+ }
+
+ log.info("MessageQueueTest: " + test + " completed");
+ log.info("Result:");
+
+ String testResultJson = readResource("it/" + test + "/result.json");
+ Map<String, Object> testResult = (Map<String, Object>) JsonUtil.jsonToData(testResultJson);
+ MessageQueueTestResult.checkResult(testResult);
+ }
+
+ private static class Classifier implements MessageClassifier {
+
+ @Override
+ public Queue determineQueue(MessageData request) {
+ Queue q = new Queue();
+ q.type = (String) request.param.get("queue_type");
+ q.id = (String) request.param.get("queue_id");
+ return q;
+ }
+
+ @Override
+ public String getExtMessageId(MessageData request) {
+ return (String) request.param.get("request_id");
+ }
+ }
+
+ private static class Processor implements MessageProcessor {
+
+ @Override
+ public void processMessage(MessageData request) {
+ long processTime = (Long) request.param.get("process_time");
+ try {
+ Thread.sleep(processTime);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ public static MessageInterceptorFactory setupMessageInterceptorFactory(Map<String, MessageQueueHandler> handlerMap, MessageClassifier classifier, MessageProcessor processor) {
+ LockHelperImpl lockHelper = new LockHelperImpl();
+ lockHelper.setDataSource(DbUtil.getDataSource());
+ lockHelper.setLockWait(5);
+ lockHelper.setRetryCount(10);
+
+ MessageDaoImpl messageDao = new MessageDaoImpl();
+ messageDao.setDataSource(DbUtil.getDataSource());
+
+ MessageInterceptorFactoryImpl f = new MessageInterceptorFactoryImpl();
+ f.setMessageDao(messageDao);
+ f.setLockHelper(lockHelper);
+ f.setLockTimeout(20);
+ f.setHandlerMap(handlerMap);
+ f.setMessageClassifier(classifier);
+ f.setMessageProcessor(processor);
+ return f;
+ }
+
+ @Parameters(name = "{0}")
+ public static Collection<Object[]> allTests() {
+ List<Object[]> ll = new ArrayList<>();
+
+ String[] tcList = list("it");
+ for (String tc : tcList) {
+ ll.add(new Object[] { tc });
+ }
+ return ll;
+ }
+
+ private static String[] list(String dir) {
+ try {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ URL url = loader.getResource(dir);
+ String path = url.getPath();
+ return new File(path).list();
+ } catch (Exception e) {
+ log.warn("Error getting directory list for: " + dir + ": " + e.getMessage(), e);
+ return new String[0];
+ }
+ }
+
+ private static String readResource(String resource) {
+ try {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ InputStream ins = loader.getResourceAsStream(resource);
+ BufferedReader in = new BufferedReader(new InputStreamReader(ins));
+ StringBuilder ss = new StringBuilder();
+ String line = in.readLine();
+ while (line != null) {
+ ss.append(line).append('\n');
+ line = in.readLine();
+ }
+ in.close();
+ return ss.toString();
+ } catch (Exception e) {
+ log.warn("Error reading resource: " + resource + ": " + e.getMessage(), e);
+ return null;
+ }
+ }
+}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTestResult.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTestResult.java
new file mode 100644
index 000000000..71a2eeafb
--- /dev/null
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTestResult.java
@@ -0,0 +1,120 @@
+package org.onap.ccsdk.features.lib.doorman.it;
+
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.onap.ccsdk.features.lib.doorman.dao.MessageDaoImpl;
+import org.onap.ccsdk.features.lib.doorman.data.Message;
+import org.onap.ccsdk.features.lib.doorman.data.MessageAction;
+import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue;
+import org.onap.ccsdk.features.lib.doorman.data.MessageStatus;
+import org.onap.ccsdk.features.lib.doorman.data.MessageStatusValue;
+import org.onap.ccsdk.features.lib.doorman.data.Queue;
+import org.onap.ccsdk.features.lib.doorman.testutil.DbUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageQueueTestResult {
+
+ private static final Logger log = LoggerFactory.getLogger(MessageQueueTest.class);
+
+ public static List<MessageQueueDataItem> readResult(String queueType, String queueId) {
+ MessageDaoImpl messageDao = new MessageDaoImpl();
+ messageDao.setDataSource(DbUtil.getDataSource());
+
+ Queue q = new Queue();
+ q.type = queueType;
+ q.id = queueId;
+
+ List<Message> messageList = messageDao.readMessageQueue(q);
+
+ List<MessageQueueDataItem> ll = new ArrayList<>();
+ if (messageList != null) {
+ for (Message m : messageList) {
+ if (m.statusHistory != null) {
+ for (MessageStatus s : m.statusHistory) {
+ MessageQueueDataItem item = new MessageQueueDataItem();
+ item.extMessageId = m.extMessageId;
+ item.status = s;
+ item.timeStamp = s.timestamp;
+ ll.add(item);
+ }
+ }
+ if (m.actionHistory != null) {
+ for (MessageAction a : m.actionHistory) {
+ MessageQueueDataItem item = new MessageQueueDataItem();
+ item.extMessageId = m.extMessageId;
+ item.action = a;
+ item.timeStamp = a.timestamp;
+ ll.add(item);
+ }
+ }
+ }
+ }
+ Collections.sort(ll);
+ return ll;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void checkResult(Map<String, Object> testResult) {
+ List<Object> resultSetupList = (List<Object>) testResult.get("queue_list");
+ if (resultSetupList != null) {
+ for (Object o : resultSetupList) {
+ Map<String, Object> resultSetup = (Map<String, Object>) o;
+ String queueType = (String) resultSetup.get("queue_type");
+ String queueId = (String) resultSetup.get("queue_id");
+ log.info(queueType + "::" + queueId);
+ List<MessageQueueDataItem> itemList = MessageQueueTestResult.readResult(queueType, queueId);
+ for (MessageQueueDataItem item : itemList) {
+ log.info(" --- " + item);
+ }
+
+ List<Object> checkSequenceList = (List<Object>) resultSetup.get("check_sequence_list");
+ for (int i = 0; i < checkSequenceList.size(); i++) {
+ List<Object> checkSequence = (List<Object>) checkSequenceList.get(i);
+ List<MessageQueueDataItem> checkItemList = new ArrayList<>();
+ for (Object o2 : checkSequence) {
+ String[] ss = ((String) o2).split("\\|");
+ MessageQueueDataItem item = new MessageQueueDataItem();
+ item.extMessageId = ss[1].trim();
+ if (ss[0].trim().equals("STATUS")) {
+ item.status = new MessageStatus();
+ item.status.status = MessageStatusValue.valueOf(ss[2].trim());
+ } else {
+ item.action = new MessageAction();
+ item.action.action = MessageActionValue.valueOf(ss[2].trim());
+ if (item.action.action == MessageActionValue.HOLD || item.action.action == MessageActionValue.RETURN_HOLD) {
+ item.action.holdTime = Integer.parseInt(ss[3].trim());
+ } else if (item.action.action == MessageActionValue.RETURN_COMPLETE) {
+ item.action.resolution = ss[3].trim();
+ }
+ }
+ checkItemList.add(item);
+ }
+ List<MessageQueueDataItem> itemList1 = new ArrayList<>(itemList);
+ itemList1.retainAll(checkItemList);
+ if (!itemList1.equals(checkItemList)) {
+ log.info("Expected sequence #" + i + " not found");
+ log.info("Expected sequence:");
+ for (MessageQueueDataItem item : checkItemList) {
+ log.info(" --- " + item);
+ }
+ log.info("Found sequence:");
+ for (MessageQueueDataItem item : itemList1) {
+ log.info(" --- " + item);
+ }
+ fail("Expected sequence #" + i + " not found");
+ } else {
+ log.info("Expected sequence #" + i + " found in the result:");
+ for (MessageQueueDataItem item : checkItemList) {
+ log.info(" --- " + item);
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Classifier.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Classifier.java
new file mode 100644
index 000000000..254fd4ce4
--- /dev/null
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Classifier.java
@@ -0,0 +1,21 @@
+package org.onap.ccsdk.features.lib.doorman.it.tc1;
+
+import org.onap.ccsdk.features.lib.doorman.MessageClassifier;
+import org.onap.ccsdk.features.lib.doorman.data.MessageData;
+import org.onap.ccsdk.features.lib.doorman.data.Queue;
+
+public class Classifier implements MessageClassifier {
+
+ @Override
+ public Queue determineQueue(MessageData request) {
+ Queue q = new Queue();
+ q.type = "Cluster";
+ q.id = "test-queue";
+ return q;
+ }
+
+ @Override
+ public String getExtMessageId(MessageData request) {
+ return (String) request.param.get("request_id");
+ }
+}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Handler.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Handler.java
new file mode 100644
index 000000000..f57fdbc36
--- /dev/null
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Handler.java
@@ -0,0 +1,10 @@
+package org.onap.ccsdk.features.lib.doorman.it.tc1;
+
+import org.onap.ccsdk.features.lib.doorman.impl.MessageHandlerBaseImpl;
+
+public class Handler extends MessageHandlerBaseImpl {
+
+ public Handler() {
+
+ }
+}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Classifier.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Classifier.java
new file mode 100644
index 000000000..a32f74650
--- /dev/null
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Classifier.java
@@ -0,0 +1,21 @@
+package org.onap.ccsdk.features.lib.doorman.it.tc2;
+
+import org.onap.ccsdk.features.lib.doorman.MessageClassifier;
+import org.onap.ccsdk.features.lib.doorman.data.MessageData;
+import org.onap.ccsdk.features.lib.doorman.data.Queue;
+
+public class Classifier implements MessageClassifier {
+
+ @Override
+ public Queue determineQueue(MessageData request) {
+ Queue q = new Queue();
+ q.type = "Cluster";
+ q.id = "test-queue";
+ return q;
+ }
+
+ @Override
+ public String getExtMessageId(MessageData request) {
+ return (String) request.param.get("request_id");
+ }
+}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Handler.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Handler.java
new file mode 100644
index 000000000..35b811c5f
--- /dev/null
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Handler.java
@@ -0,0 +1,53 @@
+package org.onap.ccsdk.features.lib.doorman.it.tc2;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.onap.ccsdk.features.lib.doorman.data.Message;
+import org.onap.ccsdk.features.lib.doorman.data.MessageData;
+import org.onap.ccsdk.features.lib.doorman.impl.MessageHandlerBaseImpl;
+import org.onap.ccsdk.features.lib.doorman.util.JsonUtil;
+
+public class Handler extends MessageHandlerBaseImpl {
+
+ public Handler() {
+ setMaxParallelCount(100);
+ setUpdateWaitTime(20);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected String determineUpdateGroup(Message msg) {
+ if (msg.request.body != null) {
+ Map<String, Object> body = (Map<String, Object>) JsonUtil.jsonToData(msg.request.body);
+ String op = (String) body.get("operation");
+ String entityId = (String) body.get("entity_id");
+ if ("update".equals(op)) {
+ return entityId;
+ }
+ }
+ return super.determineUpdateGroup(msg);
+ }
+
+ @Override
+ protected long determineUpdateSequence(Message msg) {
+ if (msg.request.param != null) {
+ Long n = (Long) msg.request.param.get("sequence_number");
+ if (n != null) {
+ return n;
+ }
+ }
+ return super.determineUpdateSequence(msg);
+ }
+
+ @Override
+ protected MessageData completeResponse(Resolution r, Message msg) {
+ if (r == Resolution.SKIPPED) {
+ MessageData response = new MessageData();
+ response.param = new HashMap<>();
+ response.param.put("http_code", 200L);
+ response.body = "{ \"status\": \"success\" }";
+ return response;
+ }
+ return super.completeResponse(r, msg);
+ }
+}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/DbUtil.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/DbUtil.java
new file mode 100644
index 000000000..1e525b4b2
--- /dev/null
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/DbUtil.java
@@ -0,0 +1,92 @@
+package org.onap.ccsdk.features.lib.doorman.testutil;
+
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+
+import javax.sql.DataSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbUtil {
+
+ private static final Logger log = LoggerFactory.getLogger(DbUtil.class);
+
+ private static DataSource dataSource = null;
+
+ public static synchronized DataSource getDataSource() {
+ if (dataSource == null) {
+ String url = "jdbc:h2:mem:app;DB_CLOSE_DELAY=-1";
+
+ dataSource = new DataSource() {
+
+ @Override
+ public <T> T unwrap(Class<T> arg0) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> arg0) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setLoginTimeout(int arg0) throws SQLException {
+ }
+
+ @Override
+ public void setLogWriter(PrintWriter arg0) throws SQLException {
+ }
+
+ @Override
+ public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
+ return null;
+ }
+
+ @Override
+ public int getLoginTimeout() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public PrintWriter getLogWriter() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Connection getConnection(String username, String password) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ return DriverManager.getConnection(url);
+ }
+ };
+
+ try {
+ String script = FileUtil.read("/schema.sql");
+
+ String[] sqlList = script.split(";");
+ try (Connection con = dataSource.getConnection()) {
+ for (String sql : sqlList) {
+ if (!sql.trim().isEmpty()) {
+ sql = sql.trim();
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ log.info("Executing statement:\n" + sql);
+ ps.execute();
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return dataSource;
+ }
+}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/FileUtil.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/FileUtil.java
new file mode 100644
index 000000000..5ae92b4fc
--- /dev/null
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/FileUtil.java
@@ -0,0 +1,24 @@
+package org.onap.ccsdk.features.lib.doorman.testutil;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+public class FileUtil {
+
+ public static String read(String fileName) throws Exception {
+ String ss = "";
+ try (InputStream is = DbUtil.class.getResourceAsStream(fileName)) {
+ try (InputStreamReader isr = new InputStreamReader(is)) {
+ try (BufferedReader in = new BufferedReader(isr)) {
+ String s = in.readLine();
+ while (s != null) {
+ ss += s + '\n';
+ s = in.readLine();
+ }
+ }
+ }
+ }
+ return ss;
+ }
+}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/util/TestJsonUtil.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/util/TestJsonUtil.java
new file mode 100644
index 000000000..f0ac87e24
--- /dev/null
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/util/TestJsonUtil.java
@@ -0,0 +1,26 @@
+package org.onap.ccsdk.features.lib.doorman.util;
+
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.onap.ccsdk.features.lib.doorman.testutil.FileUtil;
+import org.onap.ccsdk.features.lib.doorman.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestJsonUtil {
+
+ private static final Logger log = LoggerFactory.getLogger(TestJsonUtil.class);
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testJsonConversion() throws Exception {
+ String json1 = FileUtil.read("/test1.json");
+ Map<String, Object> data1 = (Map<String, Object>) JsonUtil.jsonToData(json1);
+ String convJson1 = JsonUtil.dataToJson(data1);
+ log.info("Converted JSON:\n" + convJson1);
+ Map<String, Object> convData1 = (Map<String, Object>) JsonUtil.jsonToData(convJson1);
+ Assert.assertEquals(data1, convData1);
+ }
+}
diff --git a/lib/doorman/src/test/resources/it/tc1/result.json b/lib/doorman/src/test/resources/it/tc1/result.json
new file mode 100644
index 000000000..3b048827c
--- /dev/null
+++ b/lib/doorman/src/test/resources/it/tc1/result.json
@@ -0,0 +1,31 @@
+{
+ "queue_list": [
+ {
+ "queue_type": "QQ",
+ "queue_id": "ALL",
+ "check_sequence_list": [
+ [
+ "STATUS | req-1 | ARRIVED",
+ "ACTION | req-1 | PROCESS",
+ "STATUS | req-1 | PROCESSING_SYNC",
+ "STATUS | req-2 | ARRIVED",
+ "ACTION | req-2 | HOLD | 60",
+ "STATUS | req-2 | IN_QUEUE",
+ "STATUS | req-3 | ARRIVED",
+ "ACTION | req-3 | HOLD | 60",
+ "STATUS | req-3 | IN_QUEUE",
+ "ACTION | req-1 | RETURN_COMPLETE | PROCESSED",
+ "ACTION | req-2 | PROCESS",
+ "STATUS | req-1 | COMPLETED",
+ "STATUS | req-2 | PROCESSING_SYNC",
+ "ACTION | req-2 | RETURN_COMPLETE | PROCESSED",
+ "ACTION | req-3 | PROCESS",
+ "STATUS | req-2 | COMPLETED",
+ "STATUS | req-3 | PROCESSING_SYNC",
+ "ACTION | req-3 | RETURN_COMPLETE | PROCESSED",
+ "STATUS | req-3 | COMPLETED"
+ ]
+ ]
+ }
+ ]
+}
diff --git a/lib/doorman/src/test/resources/it/tc1/test.json b/lib/doorman/src/test/resources/it/tc1/test.json
new file mode 100644
index 000000000..0d96ce8dd
--- /dev/null
+++ b/lib/doorman/src/test/resources/it/tc1/test.json
@@ -0,0 +1,67 @@
+{
+ "handler_list": [
+ {
+ "queue_type": "QQ",
+ "handler_class": "Handler"
+ }
+ ],
+ "request_list": [
+ {
+ "request_param": {
+ "request_id": "req-1",
+ "queue_type": "QQ",
+ "queue_id": "ALL",
+ "http_method": "POST"
+ },
+ "request_body": {
+ "siid": "test-siid-1"
+ },
+ "response_param": {
+ "http_code": 200
+ },
+ "response_body": {
+ "status": "success"
+ },
+ "start_time": 100,
+ "process_time": 5000
+ },
+ {
+ "request_param": {
+ "request_id": "req-2",
+ "queue_type": "QQ",
+ "queue_id": "ALL",
+ "http_method": "POST"
+ },
+ "request_body": {
+ "siid": "test-siid-2"
+ },
+ "response_param": {
+ "http_code": 200
+ },
+ "response_body": {
+ "status": "success"
+ },
+ "start_time": 500,
+ "process_time": 5000
+ },
+ {
+ "request_param": {
+ "request_id": "req-3",
+ "queue_type": "QQ",
+ "queue_id": "ALL",
+ "http_method": "POST"
+ },
+ "request_body": {
+ "siid": "test-siid-3"
+ },
+ "response_param": {
+ "http_code": 200
+ },
+ "response_body": {
+ "status": "success"
+ },
+ "start_time": 900,
+ "process_time": 5000
+ }
+ ]
+}
diff --git a/lib/doorman/src/test/resources/it/tc2/request_entity1.txt b/lib/doorman/src/test/resources/it/tc2/request_entity1.txt
new file mode 100644
index 000000000..abda4f381
--- /dev/null
+++ b/lib/doorman/src/test/resources/it/tc2/request_entity1.txt
@@ -0,0 +1,4 @@
+{
+ "operation": "update",
+ "entity_id": "entity1"
+}
diff --git a/lib/doorman/src/test/resources/it/tc2/request_entity2.txt b/lib/doorman/src/test/resources/it/tc2/request_entity2.txt
new file mode 100644
index 000000000..eb6ef6d92
--- /dev/null
+++ b/lib/doorman/src/test/resources/it/tc2/request_entity2.txt
@@ -0,0 +1,4 @@
+{
+ "operation": "update",
+ "entity_id": "entity2"
+}
diff --git a/lib/doorman/src/test/resources/it/tc2/request_other.txt b/lib/doorman/src/test/resources/it/tc2/request_other.txt
new file mode 100644
index 000000000..7763aeb7d
--- /dev/null
+++ b/lib/doorman/src/test/resources/it/tc2/request_other.txt
@@ -0,0 +1,4 @@
+{
+ "operation": "add",
+ "entity_id": "entity5"
+}
diff --git a/lib/doorman/src/test/resources/it/tc2/response.txt b/lib/doorman/src/test/resources/it/tc2/response.txt
new file mode 100644
index 000000000..21aad5aed
--- /dev/null
+++ b/lib/doorman/src/test/resources/it/tc2/response.txt
@@ -0,0 +1,3 @@
+{
+ "status": "success",
+}
diff --git a/lib/doorman/src/test/resources/it/tc2/result.json b/lib/doorman/src/test/resources/it/tc2/result.json
new file mode 100644
index 000000000..d2df2ea7b
--- /dev/null
+++ b/lib/doorman/src/test/resources/it/tc2/result.json
@@ -0,0 +1,94 @@
+{
+ "queue_list": [
+ {
+ "queue_type": "Cluster",
+ "queue_id": "Cluster",
+ "check_sequence_list": [
+ [
+ "STATUS | req-entity1-1 | ARRIVED",
+ "ACTION | req-entity1-1 | HOLD | 20",
+ "STATUS | req-entity1-1 | IN_QUEUE",
+ "STATUS | req-entity1-2 | ARRIVED",
+ "ACTION | req-entity1-1 | RETURN_COMPLETE | SKIPPED",
+ "ACTION | req-entity1-2 | HOLD | 20",
+ "STATUS | req-entity1-2 | IN_QUEUE",
+ "STATUS | req-entity1-3 | ARRIVED",
+ "ACTION | req-entity1-2 | RETURN_COMPLETE | SKIPPED",
+ "ACTION | req-entity1-3 | HOLD | 20",
+ "STATUS | req-entity1-3 | IN_QUEUE",
+ "STATUS | req-entity1-4 | ARRIVED",
+ "ACTION | req-entity1-3 | RETURN_COMPLETE | SKIPPED",
+ "ACTION | req-entity1-4 | HOLD | 20",
+ "STATUS | req-entity1-4 | IN_QUEUE",
+ "ACTION | req-entity1-4 | PROCESS",
+ "STATUS | req-entity1-4 | PROCESSING_SYNC",
+ "ACTION | req-entity1-4 | RETURN_COMPLETE | PROCESSED",
+ "STATUS | req-entity1-4 | COMPLETED"
+ ],
+ [
+ "STATUS | req-entity2-1 | ARRIVED",
+ "ACTION | req-entity2-1 | HOLD | 20",
+ "STATUS | req-entity2-1 | IN_QUEUE",
+ "STATUS | req-entity2-2 | ARRIVED",
+ "ACTION | req-entity2-1 | RETURN_COMPLETE | SKIPPED",
+ "ACTION | req-entity2-2 | HOLD | 20",
+ "STATUS | req-entity2-2 | IN_QUEUE",
+ "STATUS | req-entity2-3 | ARRIVED",
+ "ACTION | req-entity2-2 | RETURN_COMPLETE | SKIPPED",
+ "ACTION | req-entity2-3 | HOLD | 20",
+ "STATUS | req-entity2-3 | IN_QUEUE",
+ "STATUS | req-entity2-4 | ARRIVED",
+ "ACTION | req-entity2-3 | RETURN_COMPLETE | SKIPPED",
+ "ACTION | req-entity2-4 | HOLD | 20",
+ "STATUS | req-entity2-4 | IN_QUEUE",
+ "ACTION | req-entity2-4 | PROCESS",
+ "STATUS | req-entity2-4 | PROCESSING_SYNC",
+ "ACTION | req-entity2-4 | RETURN_COMPLETE | PROCESSED",
+ "STATUS | req-entity2-4 | COMPLETED"
+ ],
+ [
+ "STATUS | req-entity1-1 | ARRIVED",
+ "ACTION | req-entity1-1 | HOLD | 20",
+ "STATUS | req-entity1-1 | IN_QUEUE",
+ "ACTION | req-entity1-1 | RETURN_COMPLETE | SKIPPED",
+ "STATUS | req-entity1-1 | COMPLETED"
+ ],
+ [
+ "STATUS | req-entity1-2 | ARRIVED",
+ "ACTION | req-entity1-2 | HOLD | 20",
+ "STATUS | req-entity1-2 | IN_QUEUE",
+ "ACTION | req-entity1-2 | RETURN_COMPLETE | SKIPPED",
+ "STATUS | req-entity1-2 | COMPLETED"
+ ],
+ [
+ "STATUS | req-entity1-3 | ARRIVED",
+ "ACTION | req-entity1-3 | HOLD | 20",
+ "STATUS | req-entity1-3 | IN_QUEUE",
+ "ACTION | req-entity1-3 | RETURN_COMPLETE | SKIPPED",
+ "STATUS | req-entity1-3 | COMPLETED"
+ ],
+ [
+ "STATUS | req-entity2-1 | ARRIVED",
+ "ACTION | req-entity2-1 | HOLD | 20",
+ "STATUS | req-entity2-1 | IN_QUEUE",
+ "ACTION | req-entity2-1 | RETURN_COMPLETE | SKIPPED",
+ "STATUS | req-entity2-1 | COMPLETED"
+ ],
+ [
+ "STATUS | req-entity2-2 | ARRIVED",
+ "ACTION | req-entity2-2 | HOLD | 20",
+ "STATUS | req-entity2-2 | IN_QUEUE",
+ "ACTION | req-entity2-2 | RETURN_COMPLETE | SKIPPED",
+ "STATUS | req-entity2-2 | COMPLETED"
+ ],
+ [
+ "STATUS | req-entity2-3 | ARRIVED",
+ "ACTION | req-entity2-3 | HOLD | 20",
+ "STATUS | req-entity2-3 | IN_QUEUE",
+ "ACTION | req-entity2-3 | RETURN_COMPLETE | SKIPPED",
+ "STATUS | req-entity2-3 | COMPLETED"
+ ]
+ ]
+ }
+ ]
+}
diff --git a/lib/doorman/src/test/resources/it/tc2/result2.json b/lib/doorman/src/test/resources/it/tc2/result2.json
new file mode 100644
index 000000000..551f1ae17
--- /dev/null
+++ b/lib/doorman/src/test/resources/it/tc2/result2.json
@@ -0,0 +1,76 @@
+{
+ "queue_list": [
+ {
+ "queue_type": "Cluster",
+ "queue_id": "Cluster",
+ "check_sequence_list": [
+ [
+ "STATUS | req-entity1-1 | ARRIVED",
+ "ACTION | req-entity1-1 | HOLD | 20",
+ "STATUS | req-entity1-1 | IN_QUEUE",
+ "STATUS | req-other-1 | ARRIVED",
+ "ACTION | req-other-1 | PROCESS",
+ "STATUS | req-other-1 | PROCESSING_SYNC",
+ "STATUS | req-entity2-1 | ARRIVED",
+ "ACTION | req-entity2-1 | HOLD | 20",
+ "STATUS | req-entity2-1 | IN_QUEUE",
+ "STATUS | req-other-2 | ARRIVED",
+ "ACTION | req-other-2 | PROCESS",
+ "STATUS | req-other-2 | PROCESSING_SYNC",
+ "STATUS | req-entity1-2 | ARRIVED",
+ "ACTION | req-entity1-1 | RETURN_COMPLETE | SKIPPED",
+ "ACTION | req-entity1-2 | HOLD | 20",
+ "STATUS | req-entity1-2 | IN_QUEUE",
+ "STATUS | req-entity2-2 | ARRIVED",
+ "ACTION | req-entity2-1 | RETURN_COMPLETE | SKIPPED",
+ "ACTION | req-entity2-2 | HOLD | 20",
+ "STATUS | req-entity2-2 | IN_QUEUE",
+ "STATUS | req-entity1-3 | ARRIVED",
+ "ACTION | req-entity1-2 | RETURN_COMPLETE | SKIPPED",
+ "ACTION | req-entity1-3 | HOLD | 20",
+ "STATUS | req-entity1-3 | IN_QUEUE",
+ "STATUS | req-other-3 | ARRIVED",
+ "ACTION | req-other-3 | PROCESS",
+ "STATUS | req-other-3 | PROCESSING_SYNC",
+ "STATUS | req-entity2-3 | ARRIVED",
+ "ACTION | req-entity2-2 | RETURN_COMPLETE | SKIPPED",
+ "ACTION | req-entity2-3 | HOLD | 20",
+ "STATUS | req-entity2-3 | IN_QUEUE",
+ "STATUS | req-entity1-4 | ARRIVED",
+ "ACTION | req-entity1-3 | RETURN_COMPLETE | SKIPPED",
+ "ACTION | req-entity1-4 | HOLD | 20",
+ "STATUS | req-entity1-4 | IN_QUEUE",
+ "STATUS | req-entity2-4 | ARRIVED",
+ "ACTION | req-entity2-3 | RETURN_COMPLETE | SKIPPED",
+ "ACTION | req-entity2-4 | HOLD | 20",
+ "STATUS | req-entity2-4 | IN_QUEUE",
+ "STATUS | req-other-4 | ARRIVED",
+ "ACTION | req-other-4 | PROCESS",
+ "STATUS | req-other-4 | PROCESSING_SYNC",
+ "STATUS | req-entity1-1 | COMPLETED",
+ "ACTION | req-other-1 | RETURN_COMPLETE | PROCESSED",
+ "STATUS | req-other-1 | COMPLETED",
+ "STATUS | req-entity2-1 | COMPLETED",
+ "ACTION | req-other-2 | RETURN_COMPLETE | PROCESSED",
+ "STATUS | req-other-2 | COMPLETED",
+ "STATUS | req-entity1-2 | COMPLETED",
+ "STATUS | req-entity2-2 | COMPLETED",
+ "STATUS | req-entity1-3 | COMPLETED",
+ "ACTION | req-other-3 | RETURN_COMPLETE | PROCESSED",
+ "STATUS | req-other-3 | COMPLETED",
+ "STATUS | req-entity2-3 | COMPLETED",
+ "ACTION | req-other-4 | RETURN_COMPLETE | PROCESSED",
+ "STATUS | req-other-4 | COMPLETED",
+ "ACTION | req-entity1-4 | PROCESS",
+ "ACTION | req-entity2-4 | PROCESS",
+ "STATUS | req-entity1-4 | PROCESSING_SYNC",
+ "STATUS | req-entity2-4 | PROCESSING_SYNC",
+ "ACTION | req-entity1-4 | RETURN_COMPLETE | PROCESSED",
+ "STATUS | req-entity1-4 | COMPLETED",
+ "ACTION | req-entity2-4 | RETURN_COMPLETE | PROCESSED",
+ "STATUS | req-entity2-4 | COMPLETED"
+ ]
+ ]
+ }
+ ]
+}
diff --git a/lib/doorman/src/test/resources/it/tc2/test.json b/lib/doorman/src/test/resources/it/tc2/test.json
new file mode 100644
index 000000000..82a0546ac
--- /dev/null
+++ b/lib/doorman/src/test/resources/it/tc2/test.json
@@ -0,0 +1,202 @@
+{
+ "handler_list": [
+ {
+ "queue_type": "Cluster",
+ "handler_class": "Handler"
+ }
+ ],
+ "request_list": [
+ {
+ "request_param": {
+ "request_id": "req-entity1-1",
+ "sequence_number": 1,
+ "queue_type": "Cluster",
+ "queue_id": "Cluster",
+ "http_method": "POST"
+ },
+ "request_body_file": "request_entity1.txt",
+ "response_param": {
+ "http_code": 200
+ },
+ "response_body_file": "response.txt",
+ "start_time": 100,
+ "process_time": 5000
+ },
+ {
+ "request_param": {
+ "request_id": "req-other-1",
+ "sequence_number": 2,
+ "queue_type": "Cluster",
+ "queue_id": "Cluster",
+ "http_method": "POST"
+ },
+ "request_body_file": "request_other.txt",
+ "response_param": {
+ "http_code": 200
+ },
+ "response_body_file": "response.txt",
+ "start_time": 200,
+ "process_time": 5000
+ },
+ {
+ "request_param": {
+ "request_id": "req-entity2-1",
+ "sequence_number": 3,
+ "queue_type": "Cluster",
+ "queue_id": "Cluster",
+ "http_method": "POST"
+ },
+ "request_body_file": "request_entity2.txt",
+ "response_param": {
+ "http_code": 200
+ },
+ "response_body_file": "response.txt",
+ "start_time": 300,
+ "process_time": 5000
+ },
+ {
+ "request_param": {
+ "request_id": "req-other-2",
+ "sequence_number": 4,
+ "queue_type": "Cluster",
+ "queue_id": "Cluster",
+ "http_method": "POST"
+ },
+ "request_body_file": "request_other.txt",
+ "response_param": {
+ "http_code": 200
+ },
+ "response_body_file": "response.txt",
+ "start_time": 400,
+ "process_time": 5000
+ },
+ {
+ "request_param": {
+ "request_id": "req-entity1-2",
+ "sequence_number": 5,
+ "queue_type": "Cluster",
+ "queue_id": "Cluster",
+ "http_method": "POST"
+ },
+ "request_body_file": "request_entity1.txt",
+ "response_param": {
+ "http_code": 200
+ },
+ "response_body_file": "response.txt",
+ "start_time": 500,
+ "process_time": 5000
+ },
+ {
+ "request_param": {
+ "request_id": "req-entity2-2",
+ "sequence_number": 6,
+ "queue_type": "Cluster",
+ "queue_id": "Cluster",
+ "http_method": "POST"
+ },
+ "request_body_file": "request_entity2.txt",
+ "response_param": {
+ "http_code": 200
+ },
+ "response_body_file": "response.txt",
+ "start_time": 600,
+ "process_time": 5000
+ },
+ {
+ "request_param": {
+ "request_id": "req-entity1-3",
+ "sequence_number": 7,
+ "queue_type": "Cluster",
+ "queue_id": "Cluster",
+ "http_method": "POST"
+ },
+ "request_body_file": "request_entity1.txt",
+ "response_param": {
+ "http_code": 200
+ },
+ "response_body_file": "response.txt",
+ "start_time": 700,
+ "process_time": 5000
+ },
+ {
+ "request_param": {
+ "request_id": "req-other-3",
+ "sequence_number": 8,
+ "queue_type": "Cluster",
+ "queue_id": "Cluster",
+ "http_method": "POST"
+ },
+ "request_body_file": "request_other.txt",
+ "response_param": {
+ "http_code": 200
+ },
+ "response_body_file": "response.txt",
+ "start_time": 800,
+ "process_time": 5000
+ },
+ {
+ "request_param": {
+ "request_id": "req-entity2-3",
+ "sequence_number": 9,
+ "queue_type": "Cluster",
+ "queue_id": "Cluster",
+ "http_method": "POST"
+ },
+ "request_body_file": "request_entity2.txt",
+ "response_param": {
+ "http_code": 200
+ },
+ "response_body_file": "response.txt",
+ "start_time": 900,
+ "process_time": 5000
+ },
+ {
+ "request_param": {
+ "request_id": "req-entity1-4",
+ "sequence_number": 10,
+ "queue_type": "Cluster",
+ "queue_id": "Cluster",
+ "http_method": "POST"
+ },
+ "request_body_file": "request_entity1.txt",
+ "response_param": {
+ "http_code": 200
+ },
+ "response_body_file": "response.txt",
+ "start_time": 3100,
+ "process_time": 5000
+ },
+ {
+ "request_param": {
+ "request_id": "req-entity2-4",
+ "sequence_number": 11,
+ "queue_type": "Cluster",
+ "queue_id": "Cluster",
+ "http_method": "POST"
+ },
+ "request_body_file": "request_entity2.txt",
+ "response_param": {
+ "http_code": 200
+ },
+ "response_body_file": "response.txt",
+ "start_time": 3200,
+ "process_time": 5000
+ },
+ {
+ "request_param": {
+ "request_id": "req-other-4",
+ "sequence_number": 12,
+ "queue_type": "Cluster",
+ "queue_id": "Cluster",
+ "http_method": "POST"
+ },
+ "request_body_file": "request_other.txt",
+ "response_param": {
+ "http_code": 200
+ },
+ "response_body_file": "response.txt",
+ "start_time": 3300,
+ "process_time": 5000
+ }
+ ]
+}
diff --git a/lib/doorman/src/test/resources/schema.sql b/lib/doorman/src/test/resources/schema.sql
new file mode 100644
index 000000000..d110e64ca
--- /dev/null
+++ b/lib/doorman/src/test/resources/schema.sql
@@ -0,0 +1,58 @@
+CREATE TABLE IF NOT EXISTS `resource_lock` (
+ `resource_lock_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+ `resource_name` varchar(256),
+ `lock_holder` varchar(100) NOT NULL,
+ `lock_count` smallint(6) NOT NULL,
+ `lock_time` datetime NOT NULL,
+ `expiration_time` datetime NOT NULL,
+ PRIMARY KEY (`resource_lock_id`),
+ UNIQUE KEY `IX1_RESOURCE_LOCK` (`resource_name`)
+);
+
+CREATE TABLE IF NOT EXISTS `message` (
+ `message_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+ `ext_message_id` varchar(64),
+ `request_param` TEXT NOT NULL,
+ `request_body` TEXT NOT NULL,
+ `arrived_timestamp` datetime NOT NULL,
+ `started_timestamp` datetime,
+ `completed_timestamp` datetime,
+ `response_timestamp` datetime,
+ `response_param` TEXT,
+ `response_body` TEXT,
+ `resolution` varchar(20),
+ `queue_type` varchar(64),
+ `queue_id` varchar(256),
+ PRIMARY KEY (`message_id`)
+);
+
+CREATE INDEX IF NOT EXISTS `ix1_message`
+ON message(queue_type, queue_id);
+
+CREATE TABLE IF NOT EXISTS `message_status` (
+ `message_status_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+ `message_id` bigint(20) unsigned NOT NULL REFERENCES message(message_id),
+ `status` varchar(20) NOT NULL,
+ `status_timestamp` datetime NOT NULL,
+ PRIMARY KEY (`message_status_id`)
+);
+
+CREATE INDEX IF NOT EXISTS `ix1_message_status`
+ON message_status(message_id);
+
+CREATE TABLE IF NOT EXISTS `message_action` (
+ `message_action_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+ `message_id` bigint(20) unsigned NOT NULL REFERENCES message(message_id),
+ `action` varchar(20) NOT NULL,
+ `action_status` varchar(20) NOT NULL,
+ `resolution` varchar(20),
+ `action_timestamp` datetime NOT NULL,
+ `done_timestamp` datetime,
+ `hold_time` int,
+ `response_param` TEXT,
+ `response_body` TEXT,
+ PRIMARY KEY (`message_action_id`)
+);
+
+CREATE INDEX IF NOT EXISTS `ix1_message_action`
+ON message_action(message_id);
diff --git a/lib/doorman/src/test/resources/test1.json b/lib/doorman/src/test/resources/test1.json
new file mode 100644
index 000000000..820675c7e
--- /dev/null
+++ b/lib/doorman/src/test/resources/test1.json
@@ -0,0 +1,36 @@
+{
+ "struct": {
+ "simple-list": [
+ "val1",
+ "val2",
+ null,
+ false,
+ true,
+ 123,
+ 0.123,
+ 1.2334e-5
+ ],
+ "list": [
+ {
+ "name1": "val1",
+ "name2": "val2"
+ },
+ {
+ "name1": "val1",
+ "name2": "val2"
+ },
+ {
+ "integer": 34,
+ "fraction": 0.2145,
+ "exponent": 6.61789e+0,
+ "boolean": true
+ },
+ [ "val1", "val2", "blah \"string\" \t\t\t\n" ] ]
+ },
+ "types": {
+ "integer": 34,
+ "fraction": 0.2145,
+ "exponent": 6.61789e+0,
+ "boolean": true
+ }
+}
diff --git a/lib/rlock/pom.xml b/lib/rlock/pom.xml
new file mode 100644
index 000000000..51b778bad
--- /dev/null
+++ b/lib/rlock/pom.xml
@@ -0,0 +1,43 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onap.ccsdk.features</groupId>
+ <artifactId>ccsdk-features</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.onap.ccsdk.features.lib.rlock</groupId>
+ <artifactId>rlock</artifactId>
+
+ <description>Resource Lock - Distributed locking feature using database</description>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java
new file mode 100644
index 000000000..17817fe1d
--- /dev/null
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java
@@ -0,0 +1,14 @@
+package org.onap.ccsdk.features.lib.rlock;
+
+import java.util.Collection;
+
+public interface LockHelper {
+
+ void lock(String resourceName, String lockRequester, int lockTimeout /* Seconds */);
+
+ void unlock(String resourceName, boolean force);
+
+ void lock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */);
+
+ void unlock(Collection<String> resourceNameList, boolean force);
+}
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java
new file mode 100644
index 000000000..666fb6af5
--- /dev/null
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java
@@ -0,0 +1,173 @@
+package org.onap.ccsdk.features.lib.rlock;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+
+import javax.sql.DataSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LockHelperImpl implements LockHelper {
+
+ private static final Logger log = LoggerFactory.getLogger(LockHelperImpl.class);
+
+ private int retryCount = 20;
+ private int lockWait = 5; // Seconds
+
+ private DataSource dataSource;
+
+ @Override
+ public void lock(String resourceName, String lockRequester, int lockTimeout /* Seconds */) {
+ lock(Collections.singleton(resourceName), lockRequester, lockTimeout);
+ }
+
+ @Override
+ public void unlock(String resourceName, boolean force) {
+ unlock(Collections.singleton(resourceName), force);
+ }
+
+ @Override
+ public void lock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */) {
+ for (int i = 0; true; i++) {
+ try {
+ tryLock(resourceNameList, lockRequester, lockTimeout);
+ log.info("Resources locked: " + resourceNameList);
+ return;
+ } catch (ResourceLockedException e) {
+ if (i > retryCount) {
+ throw e;
+ }
+ try {
+ Thread.sleep(lockWait * 1000L);
+ } catch (InterruptedException ex) {
+ }
+ }
+ }
+ }
+
+ @Override
+ public void unlock(Collection<String> lockNames, boolean force) {
+ if (lockNames == null || lockNames.size() == 0) {
+ return;
+ }
+
+ try (ResourceLockDao resourceLockDao = new ResourceLockDao(dataSource)) {
+ try {
+ for (String name : lockNames) {
+ ResourceLock l = resourceLockDao.getByResourceName(name);
+ if (l != null) {
+ if (force || l.lockCount == 1) {
+ resourceLockDao.delete(l.id);
+ } else {
+ resourceLockDao.decrementLockCount(l.id);
+ }
+ }
+ }
+ resourceLockDao.commit();
+ log.info("Resources unlocked: " + lockNames);
+ } catch (Exception e) {
+ resourceLockDao.rollback();
+ }
+ }
+ }
+
+ public void tryLock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */) {
+ if (resourceNameList == null || resourceNameList.isEmpty()) {
+ return;
+ }
+
+ lockRequester = generateLockRequester(lockRequester, 100);
+
+ // First check if all requested records are available to lock
+
+ Date now = new Date();
+
+ try (ResourceLockDao resourceLockDao = new ResourceLockDao(dataSource)) {
+ try {
+ List<ResourceLock> dbLockList = new ArrayList<>();
+ List<String> insertLockNameList = new ArrayList<>();
+ for (String name : resourceNameList) {
+ ResourceLock l = resourceLockDao.getByResourceName(name);
+
+ boolean canLock = l == null || now.getTime() > l.expirationTime.getTime() || lockRequester != null && lockRequester.equals(l.lockHolder) || l.lockCount <= 0;
+ if (!canLock) {
+ throw new ResourceLockedException(l.resourceName, l.lockHolder, lockRequester);
+ }
+
+ if (l != null) {
+ if (now.getTime() > l.expirationTime.getTime() || l.lockCount <= 0) {
+ l.lockCount = 0;
+ }
+ dbLockList.add(l);
+ } else {
+ insertLockNameList.add(name);
+ }
+ }
+
+ // Update the lock info in DB
+ for (ResourceLock l : dbLockList) {
+ resourceLockDao.update(l.id, lockRequester, now, new Date(now.getTime() + lockTimeout * 1000), l.lockCount + 1);
+ }
+
+ // Insert records for those that are not yet there
+ for (String lockName : insertLockNameList) {
+ ResourceLock l = new ResourceLock();
+ l.resourceName = lockName;
+ l.lockHolder = lockRequester;
+ l.lockTime = now;
+ l.expirationTime = new Date(now.getTime() + lockTimeout * 1000);
+ l.lockCount = 1;
+
+ try {
+ resourceLockDao.add(l);
+ } catch (Exception e) {
+ throw new ResourceLockedException(l.resourceName, "unknown", lockRequester);
+ }
+ }
+
+ resourceLockDao.commit();
+
+ } catch (Exception e) {
+ resourceLockDao.rollback();
+ throw e;
+ }
+ }
+ }
+
+ private static String generateLockRequester(String name, int maxLength) {
+ if (name == null) {
+ name = "";
+ }
+ int l1 = name.length();
+ String tname = Thread.currentThread().getName();
+ int l2 = tname.length();
+ if (l1 + l2 + 1 > maxLength) {
+ int maxl1 = maxLength / 2;
+ if (l1 > maxl1) {
+ name = name.substring(0, maxl1);
+ l1 = maxl1;
+ }
+ int maxl2 = maxLength - l1 - 1;
+ if (l2 > maxl2) {
+ tname = tname.substring(0, 6) + "..." + tname.substring(l2 - maxl2 + 9);
+ }
+ }
+ return tname + '-' + name;
+ }
+
+ public void setRetryCount(int retryCount) {
+ this.retryCount = retryCount;
+ }
+
+ public void setLockWait(int lockWait) {
+ this.lockWait = lockWait;
+ }
+
+ public void setDataSource(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+}
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java
new file mode 100644
index 000000000..a7e966855
--- /dev/null
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java
@@ -0,0 +1,13 @@
+package org.onap.ccsdk.features.lib.rlock;
+
+import java.util.Date;
+
+public class ResourceLock {
+
+ public long id;
+ public String resourceName;
+ public String lockHolder;
+ public int lockCount;
+ public Date lockTime;
+ public Date expirationTime;
+}
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java
new file mode 100644
index 000000000..4833bb28e
--- /dev/null
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java
@@ -0,0 +1,122 @@
+package org.onap.ccsdk.features.lib.rlock;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Date;
+
+import javax.sql.DataSource;
+
+public class ResourceLockDao implements AutoCloseable {
+
+ private Connection con;
+
+ public ResourceLockDao(DataSource dataSource) {
+ try {
+ con = dataSource.getConnection();
+ con.setAutoCommit(false);
+ } catch (SQLException e) {
+ throw new RuntimeException("Error getting DB connection: " + e.getMessage(), e);
+ }
+ }
+
+ public void add(ResourceLock l) {
+ String sql = "INSERT INTO RESOURCE_LOCK (resource_name, lock_holder, lock_count, lock_time, expiration_time)\n" + "VALUES (?, ?, ?, ?, ?)";
+
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setString(1, l.resourceName);
+ ps.setString(2, l.lockHolder);
+ ps.setInt(3, l.lockCount);
+ ps.setTimestamp(4, new Timestamp(l.lockTime.getTime()));
+ ps.setTimestamp(5, new Timestamp(l.expirationTime.getTime()));
+ ps.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException("Error adding lock to DB: " + e.getMessage(), e);
+ }
+ }
+
+ public void update(long id, String lockHolder, Date lockTime, Date expirationTime, int lockCount) {
+ String sql = "UPDATE RESOURCE_LOCK SET lock_holder = ?, lock_time = ?, expiration_time = ?, lock_count = ? WHERE resource_lock_id = ?";
+
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setString(1, lockHolder);
+ ps.setTimestamp(2, new Timestamp(lockTime.getTime()));
+ ps.setTimestamp(3, new Timestamp(expirationTime.getTime()));
+ ps.setInt(4, lockCount);
+ ps.setLong(5, id);
+ ps.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException("Error updating lock in DB: " + e.getMessage(), e);
+ }
+ }
+
+ public ResourceLock getByResourceName(String resourceName) {
+ String sql = "SELECT * FROM RESOURCE_LOCK WHERE resource_name = ?";
+
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setString(1, resourceName);
+ try (ResultSet rs = ps.executeQuery()) {
+ if (rs.next()) {
+ ResourceLock rl = new ResourceLock();
+ rl.id = rs.getLong("resource_lock_id");
+ rl.resourceName = rs.getString("resource_name");
+ rl.lockHolder = rs.getString("lock_holder");
+ rl.lockCount = rs.getInt("lock_count");
+ rl.lockTime = rs.getTimestamp("lock_time");
+ rl.expirationTime = rs.getTimestamp("expiration_time");
+ return rl;
+ }
+ return null;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error reading lock from DB: " + e.getMessage(), e);
+ }
+ }
+
+ public void delete(long id) {
+ String sql = "DELETE FROM RESOURCE_LOCK WHERE resource_lock_id = ?";
+
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setLong(1, id);
+ ps.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException("Error deleting lock from DB: " + e.getMessage(), e);
+ }
+ }
+
+ public void decrementLockCount(long id) {
+ String sql = "UPDATE RESOURCE_LOCK SET lock_count = lock_count - 1 WHERE resource_lock_id = ?";
+
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setLong(1, id);
+ ps.execute();
+ } catch (SQLException e) {
+ throw new RuntimeException("Error updating lock count in DB: " + e.getMessage(), e);
+ }
+ }
+
+ public void commit() {
+ try {
+ con.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException("Error committing DB connection: " + e.getMessage(), e);
+ }
+ }
+
+ public void rollback() {
+ try {
+ con.rollback();
+ } catch (SQLException e) {
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ con.close();
+ } catch (SQLException e) {
+ }
+ }
+}
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java
new file mode 100644
index 000000000..7c8cfa122
--- /dev/null
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java
@@ -0,0 +1,20 @@
+package org.onap.ccsdk.features.lib.rlock;
+
+public class ResourceLockedException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ private String lockName, lockHolder, lockRequester;
+
+ public ResourceLockedException(String lockName, String lockHolder, String lockRequester) {
+ this.lockName = lockName;
+ this.lockHolder = lockHolder;
+ this.lockRequester = lockRequester;
+ }
+
+ @Override
+ public String getMessage() {
+ return "Failed to lock [" + lockName + "] for [" + lockRequester + "]. Currently locked by [" + lockHolder +
+ "].";
+ }
+}
diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java
new file mode 100644
index 000000000..ff25e16f0
--- /dev/null
+++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java
@@ -0,0 +1,35 @@
+package org.onap.ccsdk.features.lib.rlock;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+public abstract class SynchronizedFunction {
+
+ private Set<String> synchset;
+ private String lockRequester;
+ private int lockTimeout; // Seconds
+ private LockHelper lockHelper;
+
+ protected SynchronizedFunction(LockHelper lockHelper, Collection<String> synchset, int lockTimeout) {
+ this.lockHelper = lockHelper;
+ this.synchset = new HashSet<String>(synchset);
+ this.lockRequester = generateLockRequester();
+ this.lockTimeout = lockTimeout;
+ }
+
+ protected abstract void _exec();
+
+ public void exec() {
+ lockHelper.lock(synchset, lockRequester, lockTimeout);
+ try {
+ _exec();
+ } finally {
+ lockHelper.unlock(synchset, true);
+ }
+ }
+
+ private static String generateLockRequester() {
+ return "SynchronizedFunction-" + (int) (Math.random() * 1000000);
+ }
+}
diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java
new file mode 100644
index 000000000..9f37894c9
--- /dev/null
+++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java
@@ -0,0 +1,51 @@
+package org.onap.ccsdk.features.lib.rlock;
+
+import org.junit.Test;
+import org.onap.ccsdk.features.lib.rlock.testutils.DbUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestLockHelper {
+
+ private static final Logger log = LoggerFactory.getLogger(TestLockHelper.class);
+
+ @Test
+ public void test1() throws Exception {
+ LockThread t1 = new LockThread("req1");
+ LockThread t2 = new LockThread("req2");
+ LockThread t3 = new LockThread("req3");
+
+ t1.start();
+ t2.start();
+ t3.start();
+
+ t1.join();
+ t2.join();
+ t3.join();
+ }
+
+ private class LockThread extends Thread {
+
+ private String requester;
+
+ public LockThread(String requester) {
+ this.requester = requester;
+ }
+
+ @Override
+ public void run() {
+ LockHelperImpl lockHelper = new LockHelperImpl();
+ lockHelper.setDataSource(DbUtil.getDataSource());
+
+ lockHelper.lock("resource1", requester, 20);
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ log.warn("Thread interrupted: " + e.getMessage(), e);
+ }
+
+ lockHelper.unlock("resource1", false);
+ }
+ }
+}
diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java
new file mode 100644
index 000000000..38d4d62c1
--- /dev/null
+++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java
@@ -0,0 +1,92 @@
+package org.onap.ccsdk.features.lib.rlock.testutils;
+
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+
+import javax.sql.DataSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbUtil {
+
+ private static final Logger log = LoggerFactory.getLogger(DbUtil.class);
+
+ private static DataSource dataSource = null;
+
+ public static synchronized DataSource getDataSource() {
+ if (dataSource == null) {
+ String url = "jdbc:h2:mem:app;DB_CLOSE_DELAY=-1";
+
+ dataSource = new DataSource() {
+
+ @Override
+ public <T> T unwrap(Class<T> arg0) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> arg0) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setLoginTimeout(int arg0) throws SQLException {
+ }
+
+ @Override
+ public void setLogWriter(PrintWriter arg0) throws SQLException {
+ }
+
+ @Override
+ public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
+ return null;
+ }
+
+ @Override
+ public int getLoginTimeout() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public PrintWriter getLogWriter() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Connection getConnection(String username, String password) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ return DriverManager.getConnection(url);
+ }
+ };
+
+ try {
+ String script = FileUtil.read("/schema.sql");
+
+ String[] sqlList = script.split(";");
+ try (Connection con = dataSource.getConnection()) {
+ for (String sql : sqlList) {
+ if (!sql.trim().isEmpty()) {
+ sql = sql.trim();
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ log.info("Executing statement:\n" + sql);
+ ps.execute();
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return dataSource;
+ }
+}
diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java
new file mode 100644
index 000000000..e51a3b082
--- /dev/null
+++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java
@@ -0,0 +1,24 @@
+package org.onap.ccsdk.features.lib.rlock.testutils;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+public class FileUtil {
+
+ public static String read(String fileName) throws Exception {
+ String ss = "";
+ try (InputStream is = DbUtil.class.getResourceAsStream(fileName)) {
+ try (InputStreamReader isr = new InputStreamReader(is)) {
+ try (BufferedReader in = new BufferedReader(isr)) {
+ String s = in.readLine();
+ while (s != null) {
+ ss += s + '\n';
+ s = in.readLine();
+ }
+ }
+ }
+ }
+ return ss;
+ }
+}
diff --git a/lib/rlock/src/test/resources/schema.sql b/lib/rlock/src/test/resources/schema.sql
new file mode 100644
index 000000000..26f38f684
--- /dev/null
+++ b/lib/rlock/src/test/resources/schema.sql
@@ -0,0 +1,10 @@
+CREATE TABLE IF NOT EXISTS `resource_lock` (
+ `resource_lock_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+ `resource_name` varchar(256),
+ `lock_holder` varchar(100) NOT NULL,
+ `lock_count` smallint(6) NOT NULL,
+ `lock_time` datetime NOT NULL,
+ `expiration_time` datetime NOT NULL,
+ PRIMARY KEY (`resource_lock_id`),
+ UNIQUE KEY `IX1_RESOURCE_LOCK` (`resource_name`)
+);