aboutsummaryrefslogtreecommitdiffstats
path: root/lib/doorman
diff options
context:
space:
mode:
authorDan Timoney <dtimoney@att.com>2020-04-10 13:23:51 +0000
committerGerrit Code Review <gerrit@onap.org>2020-04-10 13:23:51 +0000
commit285e59c81eef63b53e4089d2b9f5f6204becae38 (patch)
tree1f12da98320d00a9836a53d2daf2e0d905daf091 /lib/doorman
parent62edf48439a0a6b59619c03280b96c1226ad77f0 (diff)
parent5a893ea54e5f8a251be4bb774a9e187ee366de6c (diff)
Merge "Addressing issues from review"
Diffstat (limited to 'lib/doorman')
-rw-r--r--lib/doorman/pom.xml13
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java4
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java4
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java2
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java2
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java2
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java18
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java563
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java2
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java2
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java129
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java164
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java2
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java41
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java22
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java2
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java25
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java667
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java86
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java602
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java398
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java532
-rw-r--r--lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java622
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueDataItem.java150
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTest.java378
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTestResult.java184
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Classifier.java21
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Handler.java4
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Classifier.java21
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Handler.java75
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/DbUtil.java150
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/FileUtil.java30
-rw-r--r--lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/util/TestJsonUtil.java23
33 files changed, 2549 insertions, 2391 deletions
diff --git a/lib/doorman/pom.xml b/lib/doorman/pom.xml
index e7883a944..1e902d6aa 100644
--- a/lib/doorman/pom.xml
+++ b/lib/doorman/pom.xml
@@ -1,36 +1,35 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <parent>
+ <parent>
<groupId>org.onap.ccsdk.features</groupId>
<artifactId>ccsdk-features</artifactId>
<version>1.0.0-SNAPSHOT</version>
- </parent>
+ <relativePath>../..</relativePath>
+ </parent>
<groupId>org.onap.ccsdk.features.lib.doorman</groupId>
<artifactId>doorman</artifactId>
- <description>Doorman - Request prioritization and agregation queue</description>
+ <description>Doorman - Request prioritization and aggregation queue</description>
<dependencies>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.onap.ccsdk.features.lib.rlock</groupId>
<artifactId>rlock</artifactId>
<version>1.0.0-SNAPSHOT</version>
- <scope>provided</scope>
</dependency>
<dependency>
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java
index b6af1731e..e31553c4d 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java
@@ -5,7 +5,7 @@ import org.onap.ccsdk.features.lib.doorman.data.Queue;
public interface MessageClassifier {
- Queue determineQueue(MessageData request);
+ Queue determineQueue(MessageData request);
- String getExtMessageId(MessageData request);
+ String getExtMessageId(MessageData request);
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java
index 1ff811baa..36dacc64e 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java
@@ -4,7 +4,7 @@ import org.onap.ccsdk.features.lib.doorman.data.MessageData;
public interface MessageInterceptor {
- MessageData processRequest(MessageData request);
+ MessageData processRequest(MessageData request);
- void processResponse(MessageData response);
+ void processResponse(MessageData response);
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java
index 4c9886425..124f2f471 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java
@@ -2,5 +2,5 @@ package org.onap.ccsdk.features.lib.doorman;
public interface MessageInterceptorFactory {
- MessageInterceptor create();
+ MessageInterceptor create();
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java
index 0bb05a34c..7797ee095 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java
@@ -4,5 +4,5 @@ import org.onap.ccsdk.features.lib.doorman.data.MessageData;
public interface MessageProcessor {
- void processMessage(MessageData request);
+ void processMessage(MessageData request);
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java
index 4285393a4..55d7e0de9 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java
@@ -8,5 +8,5 @@ import org.onap.ccsdk.features.lib.doorman.data.MessageAction;
public interface MessageQueueHandler {
- Map<Long, MessageAction> nextAction(Event event, Message msg, List<Message> queue);
+ Map<Long, MessageAction> nextAction(Event event, Message msg, List<Message> queue);
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java
index 764faa9c8..552635c57 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java
@@ -10,21 +10,21 @@ import org.onap.ccsdk.features.lib.doorman.data.Queue;
public interface MessageDao {
- long addArrivedMessage(String extMessageId, MessageData request, Queue queue, Date timestamp);
+ long addArrivedMessage(String extMessageId, MessageData request, Queue queue, Date timestamp);
- void updateMessageStarted(long messageId, Date timestamp);
+ void updateMessageStarted(long messageId, Date timestamp);
- void updateMessageCompleted(long messageId, String resolution, Date timestamp);
+ void updateMessageCompleted(long messageId, String resolution, Date timestamp);
- void updateMessageResponse(long messageId, Date timestamp, MessageData response);
+ void updateMessageResponse(long messageId, Date timestamp, MessageData response);
- void addStatus(long messageId, MessageStatus status);
+ void addStatus(long messageId, MessageStatus status);
- void addAction(long messageId, MessageAction action);
+ void addAction(long messageId, MessageAction action);
- void updateActionDone(long actionId, Date now);
+ void updateActionDone(long actionId, Date now);
- List<Message> readMessageQueue(Queue queue);
+ List<Message> readMessageQueue(Queue queue);
- MessageAction getNextAction(long messageId);
+ MessageAction getNextAction(long messageId);
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java
index b3ecf3d4b..f04ea6259 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java
@@ -24,284 +24,313 @@ import org.onap.ccsdk.features.lib.doorman.util.JsonUtil;
public class MessageDaoImpl implements MessageDao {
- private DataSource dataSource;
+ private DataSource dataSource;
- @Override
- public long addArrivedMessage(String extMessageId, MessageData request, Queue queue, Date timestamp) {
- try (Connection con = dataSource.getConnection()) {
- try {
- con.setAutoCommit(false);
- long id = 0;
- String sql = "INSERT INTO message (ext_message_id, request_param, request_body, arrived_timestamp, queue_type, queue_id) VALUES (?, ?, ?, ?, ?, ?)";
- try (PreparedStatement ps = con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
- ps.setString(1, extMessageId);
- ps.setString(2, JsonUtil.dataToJson(request.param));
- ps.setString(3, request.body);
- ps.setTimestamp(4, new Timestamp(timestamp.getTime()));
- if (queue != null) {
- ps.setString(5, queue.type);
- ps.setString(6, queue.id);
- } else {
- ps.setNull(5, Types.VARCHAR);
- ps.setNull(6, Types.VARCHAR);
- }
- ps.executeUpdate();
- try (ResultSet rs = ps.getGeneratedKeys()) {
- rs.next();
- id = rs.getLong(1);
- }
- }
- con.commit();
- return id;
- } catch (SQLException ex) {
- con.rollback();
- throw ex;
- }
- } catch (SQLException e) {
- throw new RuntimeException("Error inserting message to DB: " + e.getMessage(), e);
- }
- }
+ @Override
+ public long addArrivedMessage(String extMessageId, MessageData request, Queue queue, Date timestamp) {
+ try (Connection con = dataSource.getConnection()) {
+ try {
+ con.setAutoCommit(false);
+ long id = 0;
+ String sql = "INSERT INTO message (\n"
+ + " ext_message_id, request_param, request_body, arrived_timestamp, queue_type, queue_id)\n"
+ + "VALUES (?, ?, ?, ?, ?, ?)";
+ try (PreparedStatement ps = con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
+ ps.setString(1, extMessageId);
+ ps.setString(2, JsonUtil.dataToJson(request.getParam()));
+ ps.setString(3, request.getBody());
+ ps.setTimestamp(4, new Timestamp(timestamp.getTime()));
+ if (queue != null) {
+ ps.setString(5, queue.getType());
+ ps.setString(6, queue.getId());
+ } else {
+ ps.setNull(5, Types.VARCHAR);
+ ps.setNull(6, Types.VARCHAR);
+ }
+ ps.executeUpdate();
+ try (ResultSet rs = ps.getGeneratedKeys()) {
+ rs.next();
+ id = rs.getLong(1);
+ }
+ }
+ con.commit();
+ return id;
+ } catch (SQLException ex) {
+ con.rollback();
+ throw ex;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error inserting message to DB: " + e.getMessage(), e);
+ }
+ }
- @Override
- public void updateMessageStarted(long messageId, Date timestamp) {
- updateMessageStatus("started_timestamp", messageId, null, timestamp);
- }
+ @Override
+ public void updateMessageStarted(long messageId, Date timestamp) {
+ updateMessageStatus("started_timestamp", messageId, null, timestamp);
+ }
- @Override
- public void updateMessageCompleted(long messageId, String resolution, Date timestamp) {
- updateMessageStatus("completed_timestamp", messageId, resolution, timestamp);
- }
+ @Override
+ public void updateMessageCompleted(long messageId, String resolution, Date timestamp) {
+ updateMessageStatus("completed_timestamp", messageId, resolution, timestamp);
+ }
- private void updateMessageStatus(String timestampColumn, long messageId, String resolution, Date timestamp) {
- try (Connection con = dataSource.getConnection()) {
- try {
- con.setAutoCommit(false);
- String sql = "UPDATE message SET " + timestampColumn + " = ? WHERE message_id = ?";
- try (PreparedStatement ps = con.prepareStatement(sql)) {
- ps.setTimestamp(1, new Timestamp(timestamp.getTime()));
- ps.setLong(2, messageId);
- ps.executeUpdate();
- }
- con.commit();
- } catch (SQLException ex) {
- con.rollback();
- throw ex;
- }
- } catch (SQLException e) {
- throw new RuntimeException("Error updating message status in DB: " + e.getMessage(), e);
- }
- }
+ private void updateMessageStatus(String timestampColumn, long messageId, String resolution, Date timestamp) {
+ try (Connection con = dataSource.getConnection()) {
+ try {
+ con.setAutoCommit(false);
+ String sql = "UPDATE message SET " + timestampColumn + " = ? WHERE message_id = ?";
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setTimestamp(1, new Timestamp(timestamp.getTime()));
+ ps.setLong(2, messageId);
+ ps.executeUpdate();
+ }
+ con.commit();
+ } catch (SQLException ex) {
+ con.rollback();
+ throw ex;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error updating message status in DB: " + e.getMessage(), e);
+ }
+ }
- @Override
- public void updateMessageResponse(long messageId, Date timestamp, MessageData response) {
- try (Connection con = dataSource.getConnection()) {
- try {
- con.setAutoCommit(false);
- String sql = "UPDATE message SET response_timestamp = ?, response_param = ?, response_body = ? WHERE message_id = ?";
- try (PreparedStatement ps = con.prepareStatement(sql)) {
- ps.setTimestamp(1, new Timestamp(timestamp.getTime()));
- ps.setString(2, JsonUtil.dataToJson(response.param));
- ps.setString(3, response.body);
- ps.setLong(4, messageId);
- ps.executeUpdate();
- }
- con.commit();
- } catch (SQLException ex) {
- con.rollback();
- throw ex;
- }
- } catch (SQLException e) {
- throw new RuntimeException("Error updating message response in DB: " + e.getMessage(), e);
- }
- }
+ @Override
+ public void updateMessageResponse(long messageId, Date timestamp, MessageData response) {
+ try (Connection con = dataSource.getConnection()) {
+ try {
+ con.setAutoCommit(false);
+ String sql = "UPDATE message SET response_timestamp = ?, response_param = ?, response_body = ?\n"
+ + "WHERE message_id = ?";
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setTimestamp(1, new Timestamp(timestamp.getTime()));
+ ps.setString(2, JsonUtil.dataToJson(response.getParam()));
+ ps.setString(3, response.getBody());
+ ps.setLong(4, messageId);
+ ps.executeUpdate();
+ }
+ con.commit();
+ } catch (SQLException ex) {
+ con.rollback();
+ throw ex;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error updating message response in DB: " + e.getMessage(), e);
+ }
+ }
- @Override
- public void addStatus(long messageId, MessageStatus status) {
- try (Connection con = dataSource.getConnection()) {
- try {
- con.setAutoCommit(false);
- String sql = "INSERT INTO message_status (message_id, status, status_timestamp) VALUES (?, ?, ?)";
- try (PreparedStatement ps = con.prepareStatement(sql)) {
- ps.setLong(1, messageId);
- ps.setString(2, status.status.toString());
- ps.setTimestamp(3, new Timestamp(status.timestamp.getTime()));
- ps.executeUpdate();
- }
- con.commit();
- } catch (SQLException ex) {
- con.rollback();
- throw ex;
- }
- } catch (SQLException e) {
- throw new RuntimeException("Error inserting message status to DB: " + e.getMessage(), e);
- }
- }
+ @Override
+ public void addStatus(long messageId, MessageStatus status) {
+ try (Connection con = dataSource.getConnection()) {
+ try {
+ con.setAutoCommit(false);
+ String sql = "INSERT INTO message_status (message_id, status, status_timestamp) VALUES (?, ?, ?)";
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setLong(1, messageId);
+ ps.setString(2, status.getStatus().toString());
+ ps.setTimestamp(3, new Timestamp(status.getTimestamp().getTime()));
+ ps.executeUpdate();
+ }
+ con.commit();
+ } catch (SQLException ex) {
+ con.rollback();
+ throw ex;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error inserting message status to DB: " + e.getMessage(), e);
+ }
+ }
- @Override
- public void addAction(long messageId, MessageAction action) {
- try (Connection con = dataSource.getConnection()) {
- try {
- con.setAutoCommit(false);
- String sql = "INSERT INTO message_action (message_id, action, action_status, resolution, action_timestamp, done_timestamp, hold_time, response_param, response_body) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
- try (PreparedStatement ps = con.prepareStatement(sql)) {
- ps.setLong(1, messageId);
- ps.setString(2, action.action.toString());
- ps.setString(3, action.actionStatus.toString());
- ps.setString(4, action.resolution);
- ps.setTimestamp(5, new Timestamp(action.timestamp.getTime()));
- if (action.doneTimestamp != null) {
- ps.setTimestamp(6, new Timestamp(action.doneTimestamp.getTime()));
- } else {
- ps.setNull(6, Types.TIMESTAMP);
- }
- ps.setInt(7, action.holdTime);
- if (action.returnResponse != null) {
- ps.setString(8, JsonUtil.dataToJson(action.returnResponse.param));
- ps.setString(9, action.returnResponse.body);
- } else {
- ps.setNull(8, Types.VARCHAR);
- ps.setNull(9, Types.VARCHAR);
- }
- ps.executeUpdate();
- }
- con.commit();
- } catch (SQLException ex) {
- con.rollback();
- throw ex;
- }
- } catch (SQLException e) {
- throw new RuntimeException("Error inserting message action to DB: " + e.getMessage(), e);
- }
- }
+ @Override
+ public void addAction(long messageId, MessageAction action) {
+ try (Connection con = dataSource.getConnection()) {
+ try {
+ con.setAutoCommit(false);
+ String sql = "INSERT INTO message_action (\n"
+ + " message_id, action, action_status, resolution, action_timestamp,\n"
+ + " done_timestamp, hold_time, response_param, response_body)\n"
+ + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setLong(1, messageId);
+ ps.setString(2, action.getAction().toString());
+ ps.setString(3, action.getActionStatus().toString());
+ ps.setString(4, action.getResolution());
+ ps.setTimestamp(5, new Timestamp(action.getTimestamp().getTime()));
+ if (action.getDoneTimestamp() != null) {
+ ps.setTimestamp(6, new Timestamp(action.getDoneTimestamp().getTime()));
+ } else {
+ ps.setNull(6, Types.TIMESTAMP);
+ }
+ ps.setInt(7, action.getHoldTime());
+ if (action.getReturnResponse() != null) {
+ ps.setString(8, JsonUtil.dataToJson(action.getReturnResponse().getParam()));
+ ps.setString(9, action.getReturnResponse().getBody());
+ } else {
+ ps.setNull(8, Types.VARCHAR);
+ ps.setNull(9, Types.VARCHAR);
+ }
+ ps.executeUpdate();
+ }
+ con.commit();
+ } catch (SQLException ex) {
+ con.rollback();
+ throw ex;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error inserting message action to DB: " + e.getMessage(), e);
+ }
+ }
- @Override
- public void updateActionDone(long actionId, Date now) {
- try (Connection con = dataSource.getConnection()) {
- try {
- con.setAutoCommit(false);
- String sql = "UPDATE message_action SET action_status = ?, done_timestamp = ? WHERE message_action_id = ?";
- try (PreparedStatement ps = con.prepareStatement(sql)) {
- ps.setString(1, ActionStatus.DONE.toString());
- ps.setTimestamp(2, new Timestamp(now.getTime()));
- ps.setLong(3, actionId);
- ps.executeUpdate();
- }
- con.commit();
- } catch (SQLException ex) {
- con.rollback();
- throw ex;
- }
- } catch (SQLException e) {
- throw new RuntimeException("Error updating action in DB: " + e.getMessage(), e);
- }
- }
+ @Override
+ public void updateActionDone(long actionId, Date now) {
+ try (Connection con = dataSource.getConnection()) {
+ try {
+ con.setAutoCommit(false);
+ String sql =
+ "UPDATE message_action SET action_status = ?, done_timestamp = ? WHERE message_action_id = ?";
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setString(1, ActionStatus.DONE.toString());
+ ps.setTimestamp(2, new Timestamp(now.getTime()));
+ ps.setLong(3, actionId);
+ ps.executeUpdate();
+ }
+ con.commit();
+ } catch (SQLException ex) {
+ con.rollback();
+ throw ex;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error updating action in DB: " + e.getMessage(), e);
+ }
+ }
- @SuppressWarnings("unchecked")
- @Override
- public List<Message> readMessageQueue(Queue queue) {
- List<Message> messageList = new ArrayList<>();
- try (Connection con = dataSource.getConnection()) {
- String msql = "SELECT * FROM message WHERE queue_type = ? AND queue_id = ?";
- String ssql = "SELECT * FROM message_status WHERE message_id = ? ORDER BY message_status_id DESC";
- String asql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY message_action_id DESC";
- try (PreparedStatement mps = con.prepareStatement(msql); PreparedStatement sps = con.prepareStatement(ssql); PreparedStatement aps = con.prepareStatement(asql)) {
- mps.setString(1, queue.type);
- mps.setString(2, queue.id);
- try (ResultSet mrs = mps.executeQuery()) {
- while (mrs.next()) {
- Message m = new Message();
- m.messageId = mrs.getLong("message_id");
- m.extMessageId = mrs.getString("ext_message_id");
- m.request = new MessageData();
- m.request.param = (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("request_param"));
- m.request.body = mrs.getString("request_body");
- m.response = new MessageData();
- m.response.param = (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("response_param"));
- m.response.body = mrs.getString("response_body");
- m.queue = new Queue();
- m.queue.type = mrs.getString("queue_type");
- m.queue.id = mrs.getString("queue_id");
- m.arrivedTimestamp = mrs.getTimestamp("arrived_timestamp");
- m.startedTimestamp = mrs.getTimestamp("started_timestamp");
- m.completedTimestamp = mrs.getTimestamp("completed_timestamp");
- m.responseTimestamp = mrs.getTimestamp("response_timestamp");
- m.statusHistory = new ArrayList<>();
- m.actionHistory = new ArrayList<>();
- messageList.add(m);
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<Message> readMessageQueue(Queue queue) {
+ List<Message> messageList = new ArrayList<>();
+ try (Connection con = dataSource.getConnection()) {
+ String msql = "SELECT * FROM message WHERE queue_type = ? AND queue_id = ?";
+ String ssql = "SELECT * FROM message_status WHERE message_id = ? ORDER BY message_status_id DESC";
+ String asql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY message_action_id DESC";
+ try (PreparedStatement mps = con.prepareStatement(msql);
+ PreparedStatement sps = con.prepareStatement(ssql);
+ PreparedStatement aps = con.prepareStatement(asql)) {
+ mps.setString(1, queue.getType());
+ mps.setString(2, queue.getId());
+ try (ResultSet mrs = mps.executeQuery()) {
+ while (mrs.next()) {
+ long messageId = mrs.getLong("message_id");
+ String extMessageId = mrs.getString("ext_message_id");
- sps.setLong(1, m.messageId);
- try (ResultSet srs = sps.executeQuery()) {
- while (srs.next()) {
- MessageStatus s = new MessageStatus();
- s.status = MessageStatusValue.valueOf(srs.getString("status"));
- s.timestamp = srs.getTimestamp("status_timestamp");
- m.statusHistory.add(s);
- }
- }
+ Map<String, Object> requestParam =
+ (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("request_param"));
+ String requestBody = mrs.getString("request_body");
+ MessageData request = null;
+ if (requestParam != null || requestBody != null) {
+ request = new MessageData(requestParam, requestBody);
+ }
- aps.setLong(1, m.messageId);
- try (ResultSet ars = aps.executeQuery()) {
- while (ars.next()) {
- MessageAction a = new MessageAction();
- a.actionId = ars.getLong("message_action_id");
- a.action = MessageActionValue.valueOf(ars.getString("action"));
- a.actionStatus = ActionStatus.valueOf(ars.getString("action_status"));
- a.timestamp = ars.getTimestamp("action_timestamp");
- a.doneTimestamp = ars.getTimestamp("done_timestamp");
- a.holdTime = ars.getInt("hold_time");
- a.returnResponse = new MessageData();
- a.returnResponse.param = (Map<String, Object>) JsonUtil.jsonToData(ars.getString("response_param"));
- a.returnResponse.body = ars.getString("response_body");
- if (a.returnResponse.param == null && a.returnResponse.body == null) {
- a.returnResponse = null;
- }
- a.resolution = ars.getString("resolution");
- m.actionHistory.add(a);
- }
- }
- }
- }
- }
- } catch (SQLException e) {
- throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e);
- }
- return messageList;
- }
+ Map<String, Object> responseParam =
+ (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("response_param"));
+ String responseBody = mrs.getString("response_body");
+ MessageData response = null;
+ if (responseParam != null || responseBody != null) {
+ response = new MessageData(responseParam, responseBody);
+ }
- @SuppressWarnings("unchecked")
- @Override
- public MessageAction getNextAction(long messageId) {
- try (Connection con = dataSource.getConnection()) {
- String sql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY action_timestamp DESC";
- try (PreparedStatement ps = con.prepareStatement(sql)) {
- ps.setLong(1, messageId);
- try (ResultSet rs = ps.executeQuery()) {
- if (rs.next()) {
- MessageAction a = new MessageAction();
- a.actionId = rs.getLong("message_action_id");
- a.action = MessageActionValue.valueOf(rs.getString("action"));
- a.actionStatus = ActionStatus.valueOf(rs.getString("action_status"));
- a.timestamp = rs.getTimestamp("action_timestamp");
- a.doneTimestamp = rs.getTimestamp("done_timestamp");
- a.holdTime = rs.getInt("hold_time");
- a.returnResponse = new MessageData();
- a.returnResponse.param = (Map<String, Object>) JsonUtil.jsonToData(rs.getString("response_param"));
- a.returnResponse.body = rs.getString("response_body");
- if (a.returnResponse.param == null && a.returnResponse.body == null) {
- a.returnResponse = null;
- }
- a.resolution = rs.getString("resolution");
- return a;
- }
- return null;
- }
- }
- } catch (SQLException e) {
- throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e);
- }
- }
+ Date arrivedTimestamp = mrs.getTimestamp("arrived_timestamp");
+ Date startedTimestamp = mrs.getTimestamp("started_timestamp");
+ Date completedTimestamp = mrs.getTimestamp("completed_timestamp");
+ Date responseTimestamp = mrs.getTimestamp("response_timestamp");
- public void setDataSource(DataSource dataSource) {
- this.dataSource = dataSource;
- }
+ List<MessageStatus> statusHistory = new ArrayList<>();
+ sps.setLong(1, messageId);
+ try (ResultSet srs = sps.executeQuery()) {
+ while (srs.next()) {
+ MessageStatusValue status = MessageStatusValue.valueOf(srs.getString("status"));
+ Date timestamp = srs.getTimestamp("status_timestamp");
+ statusHistory.add(new MessageStatus(status, timestamp));
+ }
+ }
+
+ List<MessageAction> actionHistory = new ArrayList<>();
+ aps.setLong(1, messageId);
+ try (ResultSet ars = aps.executeQuery()) {
+ while (ars.next()) {
+ long actionId = ars.getLong("message_action_id");
+ MessageActionValue action = MessageActionValue.valueOf(ars.getString("action"));
+ ActionStatus actionStatus = ActionStatus.valueOf(ars.getString("action_status"));
+ String resolution = ars.getString("resolution");
+ Date timestamp = ars.getTimestamp("action_timestamp");
+ Date doneTimestamp = ars.getTimestamp("done_timestamp");
+ Integer holdTimeO = ars.getInt("hold_time");
+ int holdTime = holdTimeO != null ? holdTimeO : 0;
+
+ Map<String, Object> returnResponseParam =
+ (Map<String, Object>) JsonUtil.jsonToData(ars.getString("response_param"));
+ String returnResponseBody = ars.getString("response_body");
+ MessageData returnResponse = null;
+ if (returnResponseParam != null || returnResponseBody != null) {
+ returnResponse = new MessageData(returnResponseParam, returnResponseBody);
+ }
+
+ MessageAction a = new MessageAction(actionId, action, actionStatus, resolution,
+ timestamp, doneTimestamp, holdTime, returnResponse);
+ actionHistory.add(a);
+ }
+ }
+
+ Message m = new Message(messageId, extMessageId, request, response, arrivedTimestamp,
+ startedTimestamp, completedTimestamp, responseTimestamp, queue, statusHistory,
+ actionHistory);
+ messageList.add(m);
+ }
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e);
+ }
+ return messageList;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public MessageAction getNextAction(long messageId) {
+ try (Connection con = dataSource.getConnection()) {
+ String sql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY action_timestamp DESC";
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ ps.setLong(1, messageId);
+ try (ResultSet rs = ps.executeQuery()) {
+ if (rs.next()) {
+ long actionId = rs.getLong("message_action_id");
+ MessageActionValue action = MessageActionValue.valueOf(rs.getString("action"));
+ ActionStatus actionStatus = ActionStatus.valueOf(rs.getString("action_status"));
+ String resolution = rs.getString("resolution");
+ Date timestamp = rs.getTimestamp("action_timestamp");
+ Date doneTimestamp = rs.getTimestamp("done_timestamp");
+ Integer holdTimeO = rs.getInt("hold_time");
+ int holdTime = holdTimeO != null ? holdTimeO : 0;
+
+ Map<String, Object> returnResponseParam =
+ (Map<String, Object>) JsonUtil.jsonToData(rs.getString("response_param"));
+ String returnResponseBody = rs.getString("response_body");
+ MessageData returnResponse = null;
+ if (returnResponseParam != null || returnResponseBody != null) {
+ returnResponse = new MessageData(returnResponseParam, returnResponseBody);
+ }
+
+ MessageAction a = new MessageAction(actionId, action, actionStatus, resolution, timestamp,
+ doneTimestamp, holdTime, returnResponse);
+ return a;
+ }
+ return null;
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e);
+ }
+ }
+
+ public void setDataSource(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java
index 4aa016575..17828f453 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java
@@ -1,5 +1,5 @@
package org.onap.ccsdk.features.lib.doorman.data;
public enum ActionStatus {
- PENDING, DONE
+ PENDING, DONE
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java
index 9960eefbf..557539722 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java
@@ -1,5 +1,5 @@
package org.onap.ccsdk.features.lib.doorman.data;
public enum Event {
- ARRIVED, COMPLETED, AWAKEN, CHECK
+ ARRIVED, COMPLETED, AWAKEN, CHECK
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java
index 1f0b11384..a1a6ef22f 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java
@@ -1,33 +1,112 @@
package org.onap.ccsdk.features.lib.doorman.data;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
public class Message {
- public long messageId;
- public String extMessageId;
- public MessageData request;
- public MessageData response;
- public Date arrivedTimestamp;
- public Date startedTimestamp;
- public Date completedTimestamp;
- public Date responseTimestamp;
- public Queue queue;
-
- public List<MessageStatus> statusHistory;
- public List<MessageAction> actionHistory;
-
- @Override
- public String toString() {
- StringBuilder ss = new StringBuilder();
- ss.append(messageId);
- if (extMessageId != null) {
- ss.append("::").append(extMessageId);
- }
- if (queue != null) {
- ss.append("::").append(queue);
- }
- return ss.toString();
- }
+ private long messageId;
+ private String extMessageId;
+ private MessageData request;
+ private MessageData response;
+ private Date arrivedTimestamp;
+ private Date startedTimestamp;
+ private Date completedTimestamp;
+ private Date responseTimestamp;
+ private Queue queue;
+
+ private List<MessageStatus> statusHistory;
+ private List<MessageAction> actionHistory;
+
+ public Message(long messageId, String extMessageId, MessageData request, Queue queue) {
+ this.messageId = messageId;
+ this.extMessageId = extMessageId;
+ this.request = request;
+ this.queue = queue;
+ }
+
+ public Message(long messageId, String extMessageId, MessageData request, MessageData response,
+ Date arrivedTimestamp, Date startedTimestamp, Date completedTimestamp, Date responseTimestamp, Queue queue,
+ List<MessageStatus> statusHistory, List<MessageAction> actionHistory) {
+ this.messageId = messageId;
+ this.extMessageId = extMessageId;
+ this.request = request;
+ this.response = response;
+ this.arrivedTimestamp = arrivedTimestamp;
+ this.startedTimestamp = startedTimestamp;
+ this.completedTimestamp = completedTimestamp;
+ this.responseTimestamp = responseTimestamp;
+ this.queue = queue;
+ this.statusHistory = new ArrayList<>(statusHistory);
+ this.actionHistory = new ArrayList<>(actionHistory);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder ss = new StringBuilder();
+ ss.append(messageId);
+ if (extMessageId != null) {
+ ss.append("::").append(extMessageId);
+ }
+ if (queue != null) {
+ ss.append("::").append(queue);
+ }
+ return ss.toString();
+ }
+
+ public int getTimeInQueue(Date now) {
+ // Find the elapsed time since message arrived. That will be the timestamp of the first
+ // status of the message (last status in the status history list)
+ if (statusHistory != null && !statusHistory.isEmpty()) {
+ MessageStatus receivedStatus = statusHistory.get(statusHistory.size() - 1);
+ return (int) ((now.getTime() - receivedStatus.getTimestamp().getTime()) / 1000);
+ }
+ return 0;
+ }
+
+ public long getMessageId() {
+ return messageId;
+ }
+
+ public String getExtMessageId() {
+ return extMessageId;
+ }
+
+ public MessageData getRequest() {
+ return request;
+ }
+
+ public MessageData getResponse() {
+ return response;
+ }
+
+ public Date getArrivedTimestamp() {
+ return arrivedTimestamp;
+ }
+
+ public Date getStartedTimestamp() {
+ return startedTimestamp;
+ }
+
+ public Date getCompletedTimestamp() {
+ return completedTimestamp;
+ }
+
+ public Date getResponseTimestamp() {
+ return responseTimestamp;
+ }
+
+ public Queue getQueue() {
+ return queue;
+ }
+
+ public List<MessageStatus> getStatusHistory() {
+ return Collections.unmodifiableList(statusHistory);
+ }
+
+ public List<MessageAction> getActionHistory() {
+ return Collections.unmodifiableList(actionHistory);
+ }
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java
index 319fc68ff..e9657ca25 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java
@@ -4,56 +4,116 @@ import java.util.Date;
public class MessageAction {
- public long actionId;
- public MessageActionValue action;
- public ActionStatus actionStatus;
- public String resolution;
- public Date timestamp;
- public Date doneTimestamp;
- public int holdTime; // in seconds
- public MessageData returnResponse;
-
- @Override
- public String toString() {
- return action.toString() + " | " + actionStatus + " | " + resolution + " | " + holdTime + " | " + returnResponse;
- }
-
- public boolean same(MessageAction a2) {
- if (action != a2.action) {
- return false;
- }
- if (action == MessageActionValue.HOLD || action == MessageActionValue.RETURN_HOLD) {
- if (holdTime != a2.holdTime) {
- return false;
- }
- }
- if (action == MessageActionValue.RETURN_COMPLETE || action == MessageActionValue.RETURN_HOLD || action == MessageActionValue.RETURN_PROCESS) {
- if (returnResponse == null != (a2.returnResponse == null)) {
- return false;
- }
- if (returnResponse != null) {
- if (returnResponse.param == null != (a2.returnResponse.param == null)) {
- return false;
- }
- if (returnResponse.param != null && !returnResponse.param.equals(a2.returnResponse.param)) {
- return false;
- }
- if (returnResponse.body == null != (a2.returnResponse.body == null)) {
- return false;
- }
- if (returnResponse.body != null && !returnResponse.body.equals(a2.returnResponse.body)) {
- return false;
- }
- }
- }
- if (action == MessageActionValue.RETURN_COMPLETE) {
- if (resolution == null != (a2.resolution == null)) {
- return false;
- }
- if (resolution != null && !resolution.equals(a2.resolution)) {
- return false;
- }
- }
- return true;
- }
+ private long actionId;
+ private MessageActionValue action;
+ private ActionStatus actionStatus;
+ private String resolution;
+ private Date timestamp;
+ private Date doneTimestamp;
+ private int holdTime; // in seconds
+ private MessageData returnResponse;
+
+ public MessageAction(long actionId, MessageActionValue action, ActionStatus actionStatus, String resolution,
+ Date timestamp, Date doneTimestamp, int holdTime, MessageData returnResponse) {
+ this.actionId = actionId;
+ this.action = action;
+ this.actionStatus = actionStatus;
+ this.resolution = resolution;
+ this.timestamp = timestamp;
+ this.doneTimestamp = doneTimestamp;
+ this.holdTime = holdTime;
+ this.returnResponse = returnResponse;
+ }
+
+ public MessageAction(MessageActionValue action, String resolution, int holdTime, MessageData returnResponse) {
+ this.action = action;
+ actionStatus = ActionStatus.PENDING;
+ this.resolution = resolution;
+ timestamp = new Date();
+ this.holdTime = holdTime;
+ this.returnResponse = returnResponse;
+ }
+
+ public void setDone() {
+ actionStatus = ActionStatus.DONE;
+ doneTimestamp = new Date();
+ }
+
+ @Override
+ public String toString() {
+ return action.toString() + " | " + actionStatus + " | " + resolution + " | " + holdTime + " | "
+ + returnResponse;
+ }
+
+ public boolean same(MessageAction a2) {
+ if (action != a2.action) {
+ return false;
+ }
+ if (action == MessageActionValue.HOLD || action == MessageActionValue.RETURN_HOLD) {
+ if (holdTime != a2.holdTime) {
+ return false;
+ }
+ }
+ if (action == MessageActionValue.RETURN_COMPLETE || action == MessageActionValue.RETURN_HOLD
+ || action == MessageActionValue.RETURN_PROCESS) {
+ if (returnResponse == null != (a2.returnResponse == null)) {
+ return false;
+ }
+ if (returnResponse != null) {
+ if (returnResponse.getParam() == null != (a2.returnResponse.getParam() == null)) {
+ return false;
+ }
+ if (returnResponse.getParam() != null && !returnResponse.getParam().equals(a2.returnResponse.getParam())) {
+ return false;
+ }
+ if (returnResponse.getBody() == null != (a2.returnResponse.getBody() == null)) {
+ return false;
+ }
+ if (returnResponse.getBody() != null && !returnResponse.getBody().equals(a2.returnResponse.getBody())) {
+ return false;
+ }
+ }
+ }
+ if (action == MessageActionValue.RETURN_COMPLETE) {
+ if (resolution == null != (a2.resolution == null)) {
+ return false;
+ }
+ if (resolution != null && !resolution.equals(a2.resolution)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public long getActionId() {
+ return actionId;
+ }
+
+ public MessageActionValue getAction() {
+ return action;
+ }
+
+ public ActionStatus getActionStatus() {
+ return actionStatus;
+ }
+
+ public String getResolution() {
+ return resolution;
+ }
+
+ public Date getTimestamp() {
+ return timestamp;
+ }
+
+ public Date getDoneTimestamp() {
+ return doneTimestamp;
+ }
+
+ public int getHoldTime() {
+ return holdTime;
+ }
+
+ public MessageData getReturnResponse() {
+ return returnResponse;
+ }
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java
index fd86d37c8..6cf47bc74 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java
@@ -1,5 +1,5 @@
package org.onap.ccsdk.features.lib.doorman.data;
public enum MessageActionValue {
- HOLD, PROCESS, RETURN_COMPLETE, RETURN_PROCESS, RETURN_HOLD
+ HOLD, PROCESS, RETURN_COMPLETE, RETURN_PROCESS, RETURN_HOLD
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java
index 1fa5f9a14..eabcc7729 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java
@@ -1,21 +1,36 @@
package org.onap.ccsdk.features.lib.doorman.data;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
public class MessageData {
- public Map<String, Object> param;
- public String body;
+ private Map<String, Object> param;
+ private String body;
- @Override
- public String toString() {
- StringBuilder ss = new StringBuilder();
- ss.append(param);
- String b = body;
- if (b != null && b.length() > 20) {
- b = b.substring(0, 20) + "...";
- }
- ss.append(b);
- return ss.toString();
- }
+ public MessageData(Map<String, Object> param, String body) {
+ this.param = new HashMap<>(param);
+ this.body = body;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder ss = new StringBuilder();
+ ss.append(param);
+ String b = body;
+ if (b != null && b.length() > 20) {
+ b = b.substring(0, 20) + "...";
+ }
+ ss.append(b);
+ return ss.toString();
+ }
+
+ public Map<String, Object> getParam() {
+ return Collections.unmodifiableMap(param);
+ }
+
+ public String getBody() {
+ return body;
+ }
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java
index e9af20be1..41dc6bf58 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java
@@ -4,6 +4,24 @@ import java.util.Date;
public class MessageStatus {
- public MessageStatusValue status;
- public Date timestamp;
+ private MessageStatusValue status;
+ private Date timestamp;
+
+ public MessageStatus(MessageStatusValue status, Date timestamp) {
+ this.status = status;
+ this.timestamp = timestamp;
+ }
+
+ public MessageStatus(MessageStatusValue status) {
+ this.status = status;
+ timestamp = new Date();
+ }
+
+ public MessageStatusValue getStatus() {
+ return status;
+ }
+
+ public Date getTimestamp() {
+ return timestamp;
+ }
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java
index af2bbf024..cd0257267 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java
@@ -1,5 +1,5 @@
package org.onap.ccsdk.features.lib.doorman.data;
public enum MessageStatusValue {
- ARRIVED, IN_QUEUE, PROCESSING_SYNC, PROCESSING_ASYNC, COMPLETED
+ ARRIVED, IN_QUEUE, PROCESSING_SYNC, PROCESSING_ASYNC, COMPLETED
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java
index 7fd83104a..cfdf137a2 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java
@@ -2,11 +2,24 @@ package org.onap.ccsdk.features.lib.doorman.data;
public class Queue {
- public String type;
- public String id;
+ private String type;
+ private String id;
- @Override
- public String toString() {
- return type + "::" + id;
- }
+ public Queue(String type, String id) {
+ this.type = type;
+ this.id = id;
+ }
+
+ @Override
+ public String toString() {
+ return type + "::" + id;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public String getId() {
+ return id;
+ }
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java
index a81942771..125c3d089 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java
@@ -13,345 +13,342 @@ import org.onap.ccsdk.features.lib.doorman.data.Message;
import org.onap.ccsdk.features.lib.doorman.data.MessageAction;
import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue;
import org.onap.ccsdk.features.lib.doorman.data.MessageData;
-import org.onap.ccsdk.features.lib.doorman.data.MessageStatus;
import org.onap.ccsdk.features.lib.doorman.data.MessageStatusValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageHandlerBaseImpl implements MessageQueueHandler {
- private static final Logger log = LoggerFactory.getLogger(MessageHandlerBaseImpl.class);
-
- private boolean async = false;
- private int maxParallelCount = 1;
- private int maxQueueSize = 0;
- private int timeToLive = 0; // in seconds
- private int maxTimeInQueue = 60; // in seconds
- private int updateWaitTime = 60; // in seconds
-
- @Override
- public Map<Long, MessageAction> nextAction(Event event, Message msg, List<Message> queue) {
- long t1 = System.currentTimeMillis();
- log.info(">>>>> Handler started for message: " + msg + ": " + event);
-
- PrioritizedMessage pmsg = null;
- List<PrioritizedMessage> processing = new ArrayList<>();
- List<PrioritizedMessage> waiting = new ArrayList<>();
- Date now = new Date();
- for (Message m : queue) {
- PrioritizedMessage pm = new PrioritizedMessage();
- pm.message = m;
- pm.priority = assignPriority(msg);
- pm.timeInQueue = getTimeInQueue(now, m);
- pm.updateGroup = determineUpdateGroup(m);
- pm.updateSequence = determineUpdateSequence(m);
-
- if (pm.message.messageId == msg.messageId) {
- pmsg = pm;
-
- if (event != Event.COMPLETED) {
- waiting.add(pm);
- }
- } else {
- MessageStatusValue s = m.statusHistory.get(0).status;
- if (s == MessageStatusValue.IN_QUEUE) {
- waiting.add(pm);
- } else if (s == MessageStatusValue.PROCESSING_SYNC || s == MessageStatusValue.PROCESSING_ASYNC) {
- processing.add(pm);
- }
- }
- }
-
- log.info(" Processing:");
- for (PrioritizedMessage pm : processing) {
- log.info(" -- " + pm.message + " | " + pm.priority + " | " + pm.timeInQueue + " | " + pm.updateGroup + " | " + pm.updateSequence);
- }
- log.info(" Waiting:");
- for (PrioritizedMessage pm : waiting) {
- log.info(" -- " + pm.message + " | " + pm.priority + " | " + pm.timeInQueue + " | " + pm.updateGroup + " | " + pm.updateSequence);
- }
-
- log.info(" Determined actions:");
-
- Collections.sort(waiting, (pm1, pm2) -> comparePrioritizedMessages(pm1, pm2));
-
- Map<Long, MessageAction> mm = new HashMap<>();
-
- List<PrioritizedMessage> skipList = findMessagesToSkip(processing, waiting);
- addNextActionComplete(mm, skipList, Resolution.SKIPPED, now);
-
- waiting.removeAll(skipList);
-
- List<PrioritizedMessage> expiredList = findExpiredMessages(processing, waiting);
- addNextActionComplete(mm, expiredList, Resolution.EXPIRED, now);
-
- waiting.removeAll(expiredList);
-
- List<PrioritizedMessage> dropList = findMessagesToDrop(processing, waiting);
- addNextActionComplete(mm, dropList, Resolution.DROPPED, now);
-
- waiting.removeAll(dropList);
-
- List<PrioritizedMessage> processList = findMessagesToProcess(processing, waiting);
- for (PrioritizedMessage pm : processList) {
- MessageAction a = new MessageAction();
- a.timestamp = now;
- if (async) {
- a.action = MessageActionValue.RETURN_PROCESS;
- a.returnResponse = ackResponse(pm.message);
- } else {
- a.action = MessageActionValue.PROCESS;
- }
- mm.put(pm.message.messageId, a);
- log.info(" -- Next action for message: " + pm.message + ": " + a.action);
- }
-
- if (event != Event.COMPLETED && !mm.containsKey(pmsg.message.messageId)) {
- MessageAction a = new MessageAction();
- a.timestamp = now;
- a.holdTime = pmsg.updateGroup != null ? updateWaitTime : maxTimeInQueue;
- if (async) {
- a.action = MessageActionValue.RETURN_HOLD;
- a.returnResponse = ackResponse(pmsg.message);
- } else {
- a.action = MessageActionValue.HOLD;
- }
- mm.put(pmsg.message.messageId, a);
- log.info(" -- Next action for message: " + pmsg.message + ": " + a.action + ": " + a.holdTime);
-
- waiting.remove(pmsg);
- }
-
- if (event == Event.COMPLETED) {
- MessageAction a = new MessageAction();
- a.timestamp = now;
- a.action = MessageActionValue.RETURN_COMPLETE;
- a.resolution = Resolution.PROCESSED.toString();
- a.returnResponse = completeResponse(Resolution.PROCESSED, msg);
- mm.put(pmsg.message.messageId, a);
- log.info(" -- Next action for message: " + pmsg.message + ": " + a.action + ": " + a.resolution);
- }
-
- long t2 = System.currentTimeMillis();
- log.info("<<<<< Handler completed for message: " + msg + ": " + event + ": Time: " + (t2 - t1));
- return mm;
- }
-
- private void addNextActionComplete(Map<Long, MessageAction> mm, List<PrioritizedMessage> skipList, Resolution r, Date now) {
- for (PrioritizedMessage pm : skipList) {
- MessageAction a = new MessageAction();
- a.action = MessageActionValue.RETURN_COMPLETE;
- a.resolution = r.toString();
- a.timestamp = now;
- a.returnResponse = completeResponse(r, pm.message);
- mm.put(pm.message.messageId, a);
- log.info(" -- Next action for message: " + pm.message + ": " + a.action + ": " + a.resolution);
- }
- }
-
- protected List<PrioritizedMessage> findMessagesToSkip(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) {
- List<PrioritizedMessage> ll = new ArrayList<>();
- Map<String, PrioritizedMessage> lastMap = new HashMap<>();
- for (PrioritizedMessage pm : waiting) {
- if (pm.updateGroup != null) {
- PrioritizedMessage last = lastMap.get(pm.updateGroup);
- if (last == null) {
- lastMap.put(pm.updateGroup, pm);
- } else {
- if (pm.updateSequence > last.updateSequence) {
- ll.add(last);
- lastMap.put(pm.updateGroup, pm);
- }
- }
- }
- }
- return ll;
- }
-
- protected List<PrioritizedMessage> findExpiredMessages(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) {
- List<PrioritizedMessage> ll = new ArrayList<>();
- if (timeToLive > 0) {
- for (PrioritizedMessage pm : waiting) {
- if (pm.timeInQueue > timeToLive) {
- ll.add(pm);
- }
- }
- }
- return ll;
- }
-
- protected List<PrioritizedMessage> findMessagesToDrop(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) {
- List<PrioritizedMessage> ll = new ArrayList<>();
- if (maxQueueSize > 0) {
- // Drop the least important messages (last in the prioritized waiting list)
- for (int i = maxQueueSize; i < waiting.size(); i++) {
- ll.add(waiting.get(i));
- }
- }
- return ll;
- }
-
- protected List<PrioritizedMessage> findMessagesToProcess(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) {
- List<PrioritizedMessage> ll = new ArrayList<>();
-
- if (processing.size() >= maxParallelCount) {
- return ll;
- }
-
- // Find messages allowed to be processed based on the concurrency rules
-
- List<List<String>> currentLocks = new ArrayList<>();
- for (PrioritizedMessage m : processing) {
- List<List<String>> locks = determineConcurency(m.message);
- if (locks != null) {
- currentLocks.addAll(locks);
- }
- }
-
- List<PrioritizedMessage> allowed = new ArrayList<>();
- for (PrioritizedMessage m : waiting) {
- List<List<String>> neededLocks = determineConcurency(m.message);
- if (allowed(currentLocks, neededLocks)) {
- allowed.add(m);
- }
- }
-
- // Remove messages that are waiting on hold
- Iterator<PrioritizedMessage> ii = allowed.iterator();
- while (ii.hasNext()) {
- PrioritizedMessage pm = ii.next();
- if (pm.updateGroup != null) {
- log.info(" --- Check: " + pm.message + ": " + pm.timeInQueue + " >= " + updateWaitTime);
- if (pm.timeInQueue < updateWaitTime) {
- ii.remove();
- }
- }
- }
-
- // Limit the number of processing messages to maxParallelCount
-
- for (int i = 0; i < allowed.size() && i < maxParallelCount - processing.size(); i++) {
- ll.add(allowed.get(i));
- }
-
- return ll;
- }
-
- private int comparePrioritizedMessages(PrioritizedMessage pm1, PrioritizedMessage pm2) {
- if (pm1.timeInQueue >= maxTimeInQueue && pm2.timeInQueue < maxTimeInQueue) {
- return -1;
- }
- if (pm1.timeInQueue < maxTimeInQueue && pm2.timeInQueue >= maxTimeInQueue) {
- return 1;
- }
- if (pm1.priority < pm2.priority) {
- return -1;
- }
- if (pm1.priority > pm2.priority) {
- return 1;
- }
- if (pm1.timeInQueue > pm2.timeInQueue) {
- return -1;
- }
- if (pm1.timeInQueue < pm2.timeInQueue) {
- return 1;
- }
- return 0;
- }
-
- private int getTimeInQueue(Date now, Message m) {
- // Find the elapsed time since message arrived. That will be the timestamp of the first
- // status of the message (last status in the status history list)
- MessageStatus receivedStatus = m.statusHistory.get(m.statusHistory.size() - 1);
- return (int) ((now.getTime() - receivedStatus.timestamp.getTime()) / 1000);
- }
-
- private boolean allowed(List<List<String>> currentLocks, List<List<String>> neededLocks) {
- if (neededLocks == null) {
- return true;
- }
- for (List<String> neededLockLevels : neededLocks) {
- for (List<String> currentLockLevels : currentLocks) {
- int n = neededLockLevels.size() < currentLockLevels.size() ? neededLockLevels.size() : currentLockLevels.size();
- boolean good = false;
- for (int i = 0; i < n; i++) {
- if (!neededLockLevels.get(i).equals(currentLockLevels.get(i))) {
- good = true;
- break;
- }
- }
- if (!good) {
- return false;
- }
- }
- }
- return true;
- }
-
- protected void initMessage(Message msg) {
- }
-
- protected void closeMessage(Message msg) {
- }
-
- protected long assignPriority(Message msg) {
- return msg.messageId; // FIFO by default
- }
-
- protected String determineUpdateGroup(Message msg) {
- return null;
- }
-
- protected long determineUpdateSequence(Message msg) {
- return msg.messageId;
- }
-
- protected List<List<String>> determineConcurency(Message msg) {
- return null;
- }
-
- protected MessageData ackResponse(Message msg) {
- return null;
- }
-
- protected MessageData completeResponse(Resolution r, Message msg) {
- return null;
- }
-
- protected static enum Resolution {
- PROCESSED, SKIPPED, EXPIRED, DROPPED
- }
-
- protected static class PrioritizedMessage {
-
- public Message message;
- public long priority;
- public int timeInQueue;
- public String updateGroup;
- public long updateSequence;
- }
-
- public void setMaxParallelCount(int maxParallelCount) {
- this.maxParallelCount = maxParallelCount;
- }
-
- public void setTimeToLive(int timeToLive) {
- this.timeToLive = timeToLive;
- }
-
- public void setMaxTimeInQueue(int maxTimeInQueue) {
- this.maxTimeInQueue = maxTimeInQueue;
- }
-
- public void setUpdateWaitTime(int updateWaitTime) {
- this.updateWaitTime = updateWaitTime;
- }
-
- public void setMaxQueueSize(int maxQueueSize) {
- this.maxQueueSize = maxQueueSize;
- }
-
- public void setAsync(boolean async) {
- this.async = async;
- }
+ private static final Logger log = LoggerFactory.getLogger(MessageHandlerBaseImpl.class);
+
+ private boolean async = false;
+ private int maxParallelCount = 1;
+ private int maxQueueSize = 0;
+ private int timeToLive = 0; // in seconds
+ private int maxTimeInQueue = 60; // in seconds
+ private int updateWaitTime = 60; // in seconds
+
+ @Override
+ public Map<Long, MessageAction> nextAction(Event event, Message msg, List<Message> queue) {
+ long t1 = System.currentTimeMillis();
+ log.info(">>>>> Handler started for message: " + msg + ": " + event);
+
+ PrioritizedMessage pmsg = null;
+ List<PrioritizedMessage> processing = new ArrayList<>();
+ List<PrioritizedMessage> waiting = new ArrayList<>();
+ Date now = new Date();
+ for (Message m : queue) {
+ PrioritizedMessage pm = new PrioritizedMessage();
+ pm.message = m;
+ pm.priority = assignPriority(msg);
+ pm.timeInQueue = m.getTimeInQueue(now);
+ pm.updateGroup = determineUpdateGroup(m);
+ pm.updateSequence = determineUpdateSequence(m);
+
+ if (pm.message.getMessageId() == msg.getMessageId()) {
+ pmsg = pm;
+
+ if (event != Event.COMPLETED) {
+ waiting.add(pm);
+ }
+ } else {
+ MessageStatusValue s = m.getStatusHistory().get(0).getStatus();
+ if (s == MessageStatusValue.IN_QUEUE) {
+ waiting.add(pm);
+ } else if (s == MessageStatusValue.PROCESSING_SYNC || s == MessageStatusValue.PROCESSING_ASYNC) {
+ processing.add(pm);
+ }
+ }
+ }
+
+ log.info(" Processing:");
+ for (PrioritizedMessage pm : processing) {
+ log.info(" -- " + pm.message + " | " + pm.priority + " | " + pm.timeInQueue + " | " + pm.updateGroup
+ + " | " + pm.updateSequence);
+ }
+ log.info(" Waiting:");
+ for (PrioritizedMessage pm : waiting) {
+ log.info(" -- " + pm.message + " | " + pm.priority + " | " + pm.timeInQueue + " | " + pm.updateGroup
+ + " | " + pm.updateSequence);
+ }
+
+ log.info(" Determined actions:");
+
+ Collections.sort(waiting, (pm1, pm2) -> comparePrioritizedMessages(pm1, pm2));
+
+ Map<Long, MessageAction> mm = new HashMap<>();
+
+ List<PrioritizedMessage> skipList = findMessagesToSkip(processing, waiting);
+ addNextActionComplete(mm, skipList, Resolution.SKIPPED, now);
+
+ waiting.removeAll(skipList);
+
+ List<PrioritizedMessage> expiredList = findExpiredMessages(processing, waiting);
+ addNextActionComplete(mm, expiredList, Resolution.EXPIRED, now);
+
+ waiting.removeAll(expiredList);
+
+ List<PrioritizedMessage> dropList = findMessagesToDrop(processing, waiting);
+ addNextActionComplete(mm, dropList, Resolution.DROPPED, now);
+
+ waiting.removeAll(dropList);
+
+ List<PrioritizedMessage> processList = findMessagesToProcess(processing, waiting);
+ for (PrioritizedMessage pm : processList) {
+ MessageActionValue action = MessageActionValue.PROCESS;
+ MessageData returnResponse = null;
+ if (async) {
+ action = MessageActionValue.RETURN_PROCESS;
+ returnResponse = ackResponse(pm.message);
+ }
+ MessageAction a = new MessageAction(action, null, 0, returnResponse);
+ mm.put(pm.message.getMessageId(), a);
+ log.info(" -- Next action for message: " + pm.message + ": " + a.getAction());
+ }
+
+ if (event != Event.COMPLETED && !mm.containsKey(pmsg.message.getMessageId())) {
+ MessageActionValue action = MessageActionValue.HOLD;
+ int holdTime = pmsg.updateGroup != null ? updateWaitTime : maxTimeInQueue;
+ MessageData returnResponse = null;
+ if (async) {
+ action = MessageActionValue.RETURN_HOLD;
+ returnResponse = ackResponse(pmsg.message);
+ }
+ MessageAction a = new MessageAction(action, null, holdTime, returnResponse);
+ mm.put(pmsg.message.getMessageId(), a);
+ log.info(" -- Next action for message: " + pmsg.message + ": " + a.getAction() + ": "
+ + a.getHoldTime());
+
+ waiting.remove(pmsg);
+ }
+
+ if (event == Event.COMPLETED) {
+ MessageActionValue action = MessageActionValue.RETURN_COMPLETE;
+ String resolution = Resolution.PROCESSED.toString();
+ MessageData returnResponse = completeResponse(Resolution.PROCESSED, msg);
+ MessageAction a = new MessageAction(action, resolution, 0, returnResponse);
+ mm.put(pmsg.message.getMessageId(), a);
+ log.info(" -- Next action for message: " + pmsg.message + ": " + a.getAction() + ": "
+ + a.getResolution());
+ }
+
+ long t2 = System.currentTimeMillis();
+ log.info("<<<<< Handler completed for message: " + msg + ": " + event + ": Time: " + (t2 - t1));
+ return mm;
+ }
+
+ private void addNextActionComplete(Map<Long, MessageAction> mm, List<PrioritizedMessage> skipList, Resolution r,
+ Date now) {
+ for (PrioritizedMessage pm : skipList) {
+ MessageActionValue action = MessageActionValue.RETURN_COMPLETE;
+ String resolution = r.toString();
+ MessageData returnResponse = completeResponse(r, pm.message);
+ MessageAction a = new MessageAction(action, resolution, 0, returnResponse);
+ mm.put(pm.message.getMessageId(), a);
+ log.info(" -- Next action for message: " + pm.message + ": " + a.getAction() + ": "
+ + a.getResolution());
+ }
+ }
+
+ protected List<PrioritizedMessage> findMessagesToSkip(List<PrioritizedMessage> processing,
+ List<PrioritizedMessage> waiting) {
+ List<PrioritizedMessage> ll = new ArrayList<>();
+ Map<String, PrioritizedMessage> lastMap = new HashMap<>();
+ for (PrioritizedMessage pm : waiting) {
+ if (pm.updateGroup != null) {
+ PrioritizedMessage last = lastMap.get(pm.updateGroup);
+ if (last == null) {
+ lastMap.put(pm.updateGroup, pm);
+ } else {
+ if (pm.updateSequence > last.updateSequence) {
+ ll.add(last);
+ lastMap.put(pm.updateGroup, pm);
+ }
+ }
+ }
+ }
+ return ll;
+ }
+
+ protected List<PrioritizedMessage> findExpiredMessages(List<PrioritizedMessage> processing,
+ List<PrioritizedMessage> waiting) {
+ List<PrioritizedMessage> ll = new ArrayList<>();
+ if (timeToLive > 0) {
+ for (PrioritizedMessage pm : waiting) {
+ if (pm.timeInQueue > timeToLive) {
+ ll.add(pm);
+ }
+ }
+ }
+ return ll;
+ }
+
+ protected List<PrioritizedMessage> findMessagesToDrop(List<PrioritizedMessage> processing,
+ List<PrioritizedMessage> waiting) {
+ List<PrioritizedMessage> ll = new ArrayList<>();
+ if (maxQueueSize > 0) {
+ // Drop the least important messages (last in the prioritized waiting list)
+ for (int i = maxQueueSize; i < waiting.size(); i++) {
+ ll.add(waiting.get(i));
+ }
+ }
+ return ll;
+ }
+
+ protected List<PrioritizedMessage> findMessagesToProcess(List<PrioritizedMessage> processing,
+ List<PrioritizedMessage> waiting) {
+ List<PrioritizedMessage> ll = new ArrayList<>();
+
+ if (processing.size() >= maxParallelCount) {
+ return ll;
+ }
+
+ // Find messages allowed to be processed based on the concurrency rules
+
+ List<List<String>> currentLocks = new ArrayList<>();
+ for (PrioritizedMessage m : processing) {
+ List<List<String>> locks = determineConcurency(m.message);
+ if (locks != null) {
+ currentLocks.addAll(locks);
+ }
+ }
+
+ List<PrioritizedMessage> allowed = new ArrayList<>();
+ for (PrioritizedMessage m : waiting) {
+ List<List<String>> neededLocks = determineConcurency(m.message);
+ if (allowed(currentLocks, neededLocks)) {
+ allowed.add(m);
+ }
+ }
+
+ // Remove messages that are waiting on hold
+ Iterator<PrioritizedMessage> ii = allowed.iterator();
+ while (ii.hasNext()) {
+ PrioritizedMessage pm = ii.next();
+ if (pm.updateGroup != null) {
+ log.info(" --- Check: " + pm.message + ": " + pm.timeInQueue + " >= " + updateWaitTime);
+ if (pm.timeInQueue < updateWaitTime) {
+ ii.remove();
+ }
+ }
+ }
+
+ // Limit the number of processing messages to maxParallelCount
+
+ for (int i = 0; i < allowed.size() && i < maxParallelCount - processing.size(); i++) {
+ ll.add(allowed.get(i));
+ }
+
+ return ll;
+ }
+
+ private int comparePrioritizedMessages(PrioritizedMessage pm1, PrioritizedMessage pm2) {
+ if (pm1.timeInQueue >= maxTimeInQueue && pm2.timeInQueue < maxTimeInQueue) {
+ return -1;
+ }
+ if (pm1.timeInQueue < maxTimeInQueue && pm2.timeInQueue >= maxTimeInQueue) {
+ return 1;
+ }
+ if (pm1.priority < pm2.priority) {
+ return -1;
+ }
+ if (pm1.priority > pm2.priority) {
+ return 1;
+ }
+ if (pm1.timeInQueue > pm2.timeInQueue) {
+ return -1;
+ }
+ if (pm1.timeInQueue < pm2.timeInQueue) {
+ return 1;
+ }
+ return 0;
+ }
+
+ private boolean allowed(List<List<String>> currentLocks, List<List<String>> neededLocks) {
+ if (neededLocks == null) {
+ return true;
+ }
+ for (List<String> neededLockLevels : neededLocks) {
+ for (List<String> currentLockLevels : currentLocks) {
+ int n = neededLockLevels.size() < currentLockLevels.size() ? neededLockLevels.size()
+ : currentLockLevels.size();
+ boolean good = false;
+ for (int i = 0; i < n; i++) {
+ if (!neededLockLevels.get(i).equals(currentLockLevels.get(i))) {
+ good = true;
+ break;
+ }
+ }
+ if (!good) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ protected void initMessage(Message msg) {}
+
+ protected void closeMessage(Message msg) {}
+
+ protected long assignPriority(Message msg) {
+ return msg.getMessageId(); // FIFO by default
+ }
+
+ protected String determineUpdateGroup(Message msg) {
+ return null;
+ }
+
+ protected long determineUpdateSequence(Message msg) {
+ return msg.getMessageId(); // Order of receiving by default
+ }
+
+ protected List<List<String>> determineConcurency(Message msg) {
+ return null;
+ }
+
+ protected MessageData ackResponse(Message msg) {
+ return null;
+ }
+
+ protected MessageData completeResponse(Resolution r, Message msg) {
+ return null;
+ }
+
+ protected static enum Resolution {
+ PROCESSED, SKIPPED, EXPIRED, DROPPED
+ }
+
+ protected static class PrioritizedMessage {
+
+ public Message message;
+ public long priority;
+ public int timeInQueue;
+ public String updateGroup;
+ public long updateSequence;
+ }
+
+ public void setMaxParallelCount(int maxParallelCount) {
+ this.maxParallelCount = maxParallelCount;
+ }
+
+ public void setTimeToLive(int timeToLive) {
+ this.timeToLive = timeToLive;
+ }
+
+ public void setMaxTimeInQueue(int maxTimeInQueue) {
+ this.maxTimeInQueue = maxTimeInQueue;
+ }
+
+ public void setUpdateWaitTime(int updateWaitTime) {
+ this.updateWaitTime = updateWaitTime;
+ }
+
+ public void setMaxQueueSize(int maxQueueSize) {
+ this.maxQueueSize = maxQueueSize;
+ }
+
+ public void setAsync(boolean async) {
+ this.async = async;
+ }
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java
index 61059e067..44897ee15 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java
@@ -11,47 +11,47 @@ import org.onap.ccsdk.features.lib.rlock.LockHelper;
public class MessageInterceptorFactoryImpl implements MessageInterceptorFactory {
- private MessageClassifier messageClassifier;
- private Map<String, MessageQueueHandler> handlerMap;
- private MessageProcessor messageProcessor;
- private MessageDao messageDao;
-
- private LockHelper lockHelper;
- private int lockTimeout; // in seconds
-
- @Override
- public MessageInterceptor create() {
- MessageInterceptorImpl mi = new MessageInterceptorImpl();
- mi.setMessageClassifier(messageClassifier);
- mi.setHandlerMap(handlerMap);
- mi.setMessageProcessor(messageProcessor);
- mi.setMessageDao(messageDao);
- mi.setLockHelper(lockHelper);
- mi.setLockTimeout(lockTimeout);
- return mi;
- }
-
- public void setMessageDao(MessageDao messageDao) {
- this.messageDao = messageDao;
- }
-
- public void setLockHelper(LockHelper lockHelper) {
- this.lockHelper = lockHelper;
- }
-
- public void setLockTimeout(int lockTimeout) {
- this.lockTimeout = lockTimeout;
- }
-
- public void setMessageClassifier(MessageClassifier messageClassifier) {
- this.messageClassifier = messageClassifier;
- }
-
- public void setHandlerMap(Map<String, MessageQueueHandler> handlerMap) {
- this.handlerMap = handlerMap;
- }
-
- public void setMessageProcessor(MessageProcessor messageProcessor) {
- this.messageProcessor = messageProcessor;
- }
+ private MessageClassifier messageClassifier;
+ private Map<String, MessageQueueHandler> handlerMap;
+ private MessageProcessor messageProcessor;
+ private MessageDao messageDao;
+
+ private LockHelper lockHelper;
+ private int lockTimeout; // in seconds
+
+ @Override
+ public MessageInterceptor create() {
+ MessageInterceptorImpl mi = new MessageInterceptorImpl();
+ mi.setMessageClassifier(messageClassifier);
+ mi.setHandlerMap(handlerMap);
+ mi.setMessageProcessor(messageProcessor);
+ mi.setMessageDao(messageDao);
+ mi.setLockHelper(lockHelper);
+ mi.setLockTimeout(lockTimeout);
+ return mi;
+ }
+
+ public void setMessageDao(MessageDao messageDao) {
+ this.messageDao = messageDao;
+ }
+
+ public void setLockHelper(LockHelper lockHelper) {
+ this.lockHelper = lockHelper;
+ }
+
+ public void setLockTimeout(int lockTimeout) {
+ this.lockTimeout = lockTimeout;
+ }
+
+ public void setMessageClassifier(MessageClassifier messageClassifier) {
+ this.messageClassifier = messageClassifier;
+ }
+
+ public void setHandlerMap(Map<String, MessageQueueHandler> handlerMap) {
+ this.handlerMap = handlerMap;
+ }
+
+ public void setMessageProcessor(MessageProcessor messageProcessor) {
+ this.messageProcessor = messageProcessor;
+ }
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java
index d2ab97101..89f29b327 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java
@@ -25,312 +25,298 @@ import org.slf4j.LoggerFactory;
public class MessageInterceptorImpl implements MessageInterceptor {
- private static final Logger log = LoggerFactory.getLogger(MessageInterceptorImpl.class);
-
- private MessageClassifier messageClassifier;
- private Map<String, MessageQueueHandler> handlerMap;
- private MessageProcessor messageProcessor;
- private MessageDao messageDao;
-
- private LockHelper lockHelper;
- private int lockTimeout = 10; // in seconds
-
- private Message message = null;
- private MessageQueueHandler handler = null;
-
- @Override
- public MessageData processRequest(MessageData request) {
- Date now = new Date();
-
- Queue q = null;
- String extMessageId = null;
- if (messageClassifier != null) {
- q = messageClassifier.determineQueue(request);
- extMessageId = messageClassifier.getExtMessageId(request);
- }
-
- long id = messageDao.addArrivedMessage(extMessageId, request, q, now);
-
- MessageStatus arrivedStatus = new MessageStatus();
- arrivedStatus.status = MessageStatusValue.ARRIVED;
- arrivedStatus.timestamp = now;
- messageDao.addStatus(id, arrivedStatus);
-
- message = new Message();
- message.messageId = id;
- message.request = request;
- message.arrivedTimestamp = now;
- message.queue = q;
- message.extMessageId = extMessageId;
-
- log.info("Message received: " + message);
-
- if (q != null && handlerMap != null) {
- handler = handlerMap.get(q.type);
- }
-
- if (q == null || handler == null) {
- processSync();
- return null; // Do nothing, normal message processing
- }
-
- Event event = Event.ARRIVED;
-
- while (true) {
- MessageFunction func = new MessageFunction(event);
- func.exec();
-
- MessageAction nextAction = func.getNextAction();
- if (nextAction == null) {
- processSync();
- return null;
- }
-
- switch (nextAction.action) {
-
- case PROCESS:
- processSync();
- return null;
-
- case HOLD: {
- event = waitForNewAction(nextAction.holdTime);
- break;
- }
-
- case RETURN_COMPLETE:
- returnComplete(nextAction);
- return nextAction.returnResponse;
-
- case RETURN_PROCESS:
- processAsync(nextAction.returnResponse);
- return nextAction.returnResponse;
-
- case RETURN_HOLD: {
- returnHold(nextAction);
- return nextAction.returnResponse;
- }
- }
- }
- }
-
- private void processSync() {
- messageDao.updateMessageStarted(message.messageId, new Date());
- log.info("Message processing started: " + message);
- }
-
- private void processAsync(MessageData returnResponse) {
- Thread t = new Thread(() -> processMessage(message.request), message.queue.type + "::" + message.queue.id + "::" + message.messageId);
- t.start();
-
- messageDao.updateMessageResponse(message.messageId, new Date(), returnResponse);
- }
-
- private void processMessage(MessageData request) {
- messageDao.updateMessageStarted(message.messageId, new Date());
- log.info("Message processing started: " + message);
- if (messageProcessor != null) {
- messageProcessor.processMessage(request);
- }
-
- MessageFunction func = new MessageFunction(Event.COMPLETED);
- func.exec();
- MessageAction nextAction = func.getNextAction();
-
- messageDao.updateMessageCompleted(message.messageId, nextAction.resolution, new Date());
- log.info("Message processing completed: " + message);
- }
-
- private void returnComplete(MessageAction nextAction) {
- Date now = new Date();
- messageDao.updateMessageResponse(message.messageId, now, nextAction.returnResponse);
- messageDao.updateMessageCompleted(message.messageId, nextAction.resolution, now);
- log.info("Message processing completed: " + message);
- }
-
- private void returnHold(MessageAction nextAction) {
- Thread t = new Thread(() -> asyncQueue(nextAction), message.queue.type + "::" + message.queue.id + "::" + message.messageId);
- t.start();
- messageDao.updateMessageResponse(message.messageId, new Date(), nextAction.returnResponse);
- }
-
- private void asyncQueue(MessageAction nextAction) {
- Event event = waitForNewAction(nextAction.holdTime);
-
- while (true) {
- MessageFunction func = new MessageFunction(event);
- func.exec();
-
- nextAction = func.getNextAction();
- if (nextAction == null) {
- processMessage(message.request);
- return;
- }
-
- switch (nextAction.action) {
- case PROCESS:
- processMessage(message.request);
- return;
-
- case HOLD: {
- event = waitForNewAction(nextAction.holdTime);
- break;
- }
-
- default:
- return;
- }
- }
- }
-
- private Event waitForNewAction(int holdTime) {
- long startTime = System.currentTimeMillis();
- long currentTime = startTime;
- while (currentTime - startTime <= (holdTime + 1) * 1000) {
- try {
- Thread.sleep(5000);
- } catch (Exception e) {
- }
-
- MessageAction nextAction = messageDao.getNextAction(message.messageId);
- if (nextAction != null && nextAction.action != MessageActionValue.HOLD) {
- return Event.AWAKEN;
- }
-
- currentTime = System.currentTimeMillis();
- }
- return Event.CHECK;
- }
-
- @Override
- public void processResponse(MessageData response) {
- if (message == null) {
- return;
- }
-
- String resolution = null;
- if (message.queue != null && handler != null) {
- MessageFunction func = new MessageFunction(Event.COMPLETED);
- func.exec();
- MessageAction nextAction = func.getNextAction();
- if (nextAction != null) {
- resolution = nextAction.resolution;
- }
- }
-
- Date now = new Date();
- messageDao.updateMessageResponse(message.messageId, now, response);
- messageDao.updateMessageCompleted(message.messageId, resolution, now);
- log.info("Message processing completed: " + message);
- }
-
- private class MessageFunction extends SynchronizedFunction {
-
- private Event event;
-
- private MessageAction nextAction = null; // Output
-
- public MessageFunction(Event event) {
- super(lockHelper, Collections.singleton(message.queue.type + "::" + message.queue.id), lockTimeout);
- this.event = event;
- }
-
- @Override
- protected void _exec() {
- List<Message> messageQueue = messageDao.readMessageQueue(message.queue);
- if (event == Event.AWAKEN) {
- for (Message m : messageQueue) {
- if (m.messageId == message.messageId) {
- nextAction = m.actionHistory.get(0);
- }
- }
- if (nextAction != null) {
- messageDao.updateActionDone(nextAction.actionId, new Date());
- }
- } else {
- Map<Long, MessageAction> nextActionMap = handler.nextAction(event, message, messageQueue);
- if (nextActionMap != null) {
- for (Message m : messageQueue) {
- MessageAction action = nextActionMap.get(m.messageId);
- if (action != null) {
- if (m.messageId == message.messageId) {
- action.actionStatus = ActionStatus.DONE;
- action.doneTimestamp = new Date();
- messageDao.addAction(m.messageId, action);
- nextAction = action;
- } else {
- MessageAction lastAction = m.actionHistory.get(0);
- if (lastAction.actionStatus != ActionStatus.PENDING || !action.same(lastAction)) {
- action.actionStatus = ActionStatus.PENDING;
- messageDao.addAction(m.messageId, action);
- }
- }
- }
- }
- }
- }
- if (nextAction != null) {
- log.info("Next message action: " + message + ":" + nextAction.action);
- MessageStatus status = determineStatus(nextAction);
- if (status != null) {
- messageDao.addStatus(message.messageId, status);
- log.info("Updating message status: " + message + ":" + status.status);
- }
- }
- }
-
- public MessageAction getNextAction() {
- return nextAction;
- }
- }
-
- private MessageStatus determineStatus(MessageAction action) {
- if (action == null) {
- return null;
- }
-
- MessageStatus s = new MessageStatus();
- s.timestamp = new Date();
-
- switch (action.action) {
- case PROCESS:
- s.status = MessageStatusValue.PROCESSING_SYNC;
- break;
- case HOLD:
- case RETURN_HOLD:
- s.status = MessageStatusValue.IN_QUEUE;
- break;
- case RETURN_PROCESS:
- s.status = MessageStatusValue.PROCESSING_ASYNC;
- break;
- case RETURN_COMPLETE:
- s.status = MessageStatusValue.COMPLETED;
- break;
- }
-
- return s;
- }
-
- public void setMessageDao(MessageDao messageDao) {
- this.messageDao = messageDao;
- }
-
- public void setLockHelper(LockHelper lockHelper) {
- this.lockHelper = lockHelper;
- }
-
- public void setLockTimeout(int lockTimeout) {
- this.lockTimeout = lockTimeout;
- }
-
- public void setMessageClassifier(MessageClassifier messageClassifier) {
- this.messageClassifier = messageClassifier;
- }
-
- public void setHandlerMap(Map<String, MessageQueueHandler> handlerMap) {
- this.handlerMap = handlerMap;
- }
-
- public void setMessageProcessor(MessageProcessor messageProcessor) {
- this.messageProcessor = messageProcessor;
- }
+ private static final Logger log = LoggerFactory.getLogger(MessageInterceptorImpl.class);
+
+ private MessageClassifier messageClassifier;
+ private Map<String, MessageQueueHandler> handlerMap;
+ private MessageProcessor messageProcessor;
+ private MessageDao messageDao;
+
+ private LockHelper lockHelper;
+ private int lockTimeout = 10; // in seconds
+
+ private Message message = null;
+ private MessageQueueHandler handler = null;
+
+ @Override
+ public MessageData processRequest(MessageData request) {
+ Date now = new Date();
+
+ Queue queue = null;
+ String extMessageId = null;
+ if (messageClassifier != null) {
+ queue = messageClassifier.determineQueue(request);
+ extMessageId = messageClassifier.getExtMessageId(request);
+ }
+
+ long id = messageDao.addArrivedMessage(extMessageId, request, queue, now);
+
+ MessageStatus arrivedStatus = new MessageStatus(MessageStatusValue.ARRIVED, now);
+ messageDao.addStatus(id, arrivedStatus);
+
+ message = new Message(id, extMessageId, request, queue);
+
+ log.info("Message received: " + message);
+
+ if (queue != null && handlerMap != null) {
+ handler = handlerMap.get(queue.getType());
+ }
+
+ if (queue == null || handler == null) {
+ processSync();
+ return null; // Do nothing, normal message processing
+ }
+
+ Event event = Event.ARRIVED;
+
+ while (true) {
+ MessageFunction func = new MessageFunction(event);
+ func.exec();
+
+ MessageAction nextAction = func.getNextAction();
+ if (nextAction == null) {
+ processSync();
+ return null;
+ }
+
+ switch (nextAction.getAction()) {
+
+ case PROCESS:
+ processSync();
+ return null;
+
+ case HOLD: {
+ event = waitForNewAction(nextAction.getHoldTime());
+ break;
+ }
+
+ case RETURN_COMPLETE:
+ returnComplete(nextAction);
+ return nextAction.getReturnResponse();
+
+ case RETURN_PROCESS:
+ processAsync(nextAction.getReturnResponse());
+ return nextAction.getReturnResponse();
+
+ case RETURN_HOLD: {
+ returnHold(nextAction);
+ return nextAction.getReturnResponse();
+ }
+ }
+ }
+ }
+
+ private void processSync() {
+ messageDao.updateMessageStarted(message.getMessageId(), new Date());
+ log.info("Message processing started: " + message);
+ }
+
+ private void processAsync(MessageData returnResponse) {
+ Thread t = new Thread(() -> processMessage(message.getRequest()),
+ message.getQueue().getType() + "::" + message.getQueue().getId() + "::" + message.getMessageId());
+ t.start();
+
+ messageDao.updateMessageResponse(message.getMessageId(), new Date(), returnResponse);
+ }
+
+ private void processMessage(MessageData request) {
+ messageDao.updateMessageStarted(message.getMessageId(), new Date());
+ log.info("Message processing started: " + message);
+ if (messageProcessor != null) {
+ messageProcessor.processMessage(request);
+ }
+
+ MessageFunction func = new MessageFunction(Event.COMPLETED);
+ func.exec();
+ MessageAction nextAction = func.getNextAction();
+
+ messageDao.updateMessageCompleted(message.getMessageId(), nextAction.getResolution(), new Date());
+ log.info("Message processing completed: " + message);
+ }
+
+ private void returnComplete(MessageAction nextAction) {
+ Date now = new Date();
+ messageDao.updateMessageResponse(message.getMessageId(), now, nextAction.getReturnResponse());
+ messageDao.updateMessageCompleted(message.getMessageId(), nextAction.getResolution(), now);
+ log.info("Message processing completed: " + message);
+ }
+
+ private void returnHold(MessageAction nextAction) {
+ Thread t = new Thread(() -> asyncQueue(nextAction),
+ message.getQueue().getType() + "::" + message.getQueue().getId() + "::" + message.getMessageId());
+ t.start();
+ messageDao.updateMessageResponse(message.getMessageId(), new Date(), nextAction.getReturnResponse());
+ }
+
+ private void asyncQueue(MessageAction nextAction) {
+ Event event = waitForNewAction(nextAction.getHoldTime());
+
+ while (true) {
+ MessageFunction func = new MessageFunction(event);
+ func.exec();
+
+ nextAction = func.getNextAction();
+ if (nextAction == null) {
+ processMessage(message.getRequest());
+ return;
+ }
+
+ switch (nextAction.getAction()) {
+ case PROCESS:
+ processMessage(message.getRequest());
+ return;
+
+ case HOLD: {
+ event = waitForNewAction(nextAction.getHoldTime());
+ break;
+ }
+
+ default:
+ return;
+ }
+ }
+ }
+
+ private Event waitForNewAction(int holdTime) {
+ long startTime = System.currentTimeMillis();
+ long currentTime = startTime;
+ while (currentTime - startTime <= (holdTime + 1) * 1000) {
+ try {
+ Thread.sleep(5000);
+ } catch (Exception e) {
+ }
+
+ MessageAction nextAction = messageDao.getNextAction(message.getMessageId());
+ if (nextAction != null && nextAction.getAction() != MessageActionValue.HOLD) {
+ return Event.AWAKEN;
+ }
+
+ currentTime = System.currentTimeMillis();
+ }
+ return Event.CHECK;
+ }
+
+ @Override
+ public void processResponse(MessageData response) {
+ if (message == null) {
+ return;
+ }
+
+ String resolution = null;
+ if (message.getQueue() != null && handler != null) {
+ MessageFunction func = new MessageFunction(Event.COMPLETED);
+ func.exec();
+ MessageAction nextAction = func.getNextAction();
+ if (nextAction != null) {
+ resolution = nextAction.getResolution();
+ }
+ }
+
+ Date now = new Date();
+ messageDao.updateMessageResponse(message.getMessageId(), now, response);
+ messageDao.updateMessageCompleted(message.getMessageId(), resolution, now);
+ log.info("Message processing completed: " + message);
+ }
+
+ private class MessageFunction extends SynchronizedFunction {
+
+ private Event event;
+
+ private MessageAction nextAction = null; // Output
+
+ public MessageFunction(Event event) {
+ super(lockHelper, Collections.singleton(message.getQueue().getType() + "::" + message.getQueue().getId()),
+ lockTimeout);
+ this.event = event;
+ }
+
+ @Override
+ protected void _exec() {
+ List<Message> messageQueue = messageDao.readMessageQueue(message.getQueue());
+ if (event == Event.AWAKEN) {
+ for (Message m : messageQueue) {
+ if (m.getMessageId() == message.getMessageId()) {
+ nextAction = m.getActionHistory().get(0);
+ }
+ }
+ if (nextAction != null) {
+ messageDao.updateActionDone(nextAction.getActionId(), new Date());
+ }
+ } else {
+ Map<Long, MessageAction> nextActionMap = handler.nextAction(event, message, messageQueue);
+ if (nextActionMap != null) {
+ for (Message m : messageQueue) {
+ MessageAction action = nextActionMap.get(m.getMessageId());
+ if (action != null) {
+ if (m.getMessageId() == message.getMessageId()) {
+ action.setDone();
+ messageDao.addAction(m.getMessageId(), action);
+ nextAction = action;
+ } else {
+ MessageAction lastAction = m.getActionHistory().get(0);
+ if (lastAction.getActionStatus() != ActionStatus.PENDING || !action.same(lastAction)) {
+ messageDao.addAction(m.getMessageId(), action);
+ }
+ }
+ }
+ }
+ }
+ }
+ if (nextAction != null) {
+ log.info("Next message action: " + message + ":" + nextAction.getAction());
+ MessageStatus status = determineStatus(nextAction);
+ if (status != null) {
+ messageDao.addStatus(message.getMessageId(), status);
+ log.info("Updating message status: " + message + ":" + status.getStatus());
+ }
+ }
+ }
+
+ public MessageAction getNextAction() {
+ return nextAction;
+ }
+ }
+
+ private MessageStatus determineStatus(MessageAction action) {
+ if (action == null) {
+ return null;
+ }
+
+ switch (action.getAction()) {
+ case PROCESS:
+ return new MessageStatus(MessageStatusValue.PROCESSING_SYNC);
+ case HOLD:
+ case RETURN_HOLD:
+ return new MessageStatus(MessageStatusValue.IN_QUEUE);
+ case RETURN_PROCESS:
+ return new MessageStatus(MessageStatusValue.PROCESSING_ASYNC);
+ case RETURN_COMPLETE:
+ return new MessageStatus(MessageStatusValue.COMPLETED);
+ }
+ return null;
+ }
+
+ public void setMessageDao(MessageDao messageDao) {
+ this.messageDao = messageDao;
+ }
+
+ public void setLockHelper(LockHelper lockHelper) {
+ this.lockHelper = lockHelper;
+ }
+
+ public void setLockTimeout(int lockTimeout) {
+ this.lockTimeout = lockTimeout;
+ }
+
+ public void setMessageClassifier(MessageClassifier messageClassifier) {
+ this.messageClassifier = messageClassifier;
+ }
+
+ public void setHandlerMap(Map<String, MessageQueueHandler> handlerMap) {
+ this.handlerMap = handlerMap;
+ }
+
+ public void setMessageProcessor(MessageProcessor messageProcessor) {
+ this.messageProcessor = messageProcessor;
+ }
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java
index 73c3d8b6a..b8296ec19 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java
@@ -13,7 +13,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
@@ -31,206 +30,199 @@ import org.onap.ccsdk.features.lib.doorman.data.MessageData;
public class MessageInterceptorFilter implements Filter {
- private MessageInterceptorFactory messageInterceptorFactory;
-
- @Override
- public void init(FilterConfig filterConfig) throws ServletException {
- }
-
- @Override
- public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
- RequestWrapper req = new RequestWrapper((HttpServletRequest) request);
- MessageData requestData = getMessageRequest(req);
-
- MessageInterceptor interceptor = messageInterceptorFactory.create();
- MessageData responseData = interceptor.processRequest(requestData);
-
- if (responseData == null) {
- ResponseWrapper res = new ResponseWrapper((HttpServletResponse) response);
-
- chain.doFilter(req, res);
-
- responseData = res.getMessageResponse();
- interceptor.processResponse(responseData);
- }
-
- setMessageResponse((HttpServletResponse) response, responseData);
- }
-
- @SuppressWarnings("unchecked")
- private void setMessageResponse(HttpServletResponse response, MessageData responseData) throws IOException {
- if (responseData.param != null) {
- String contentType = (String) responseData.param.get("content_type");
- if (contentType != null) {
- response.setContentType(contentType);
- }
- Integer httpCode = (Integer) responseData.param.get("http_code");
- if (httpCode != null) {
- response.setStatus(httpCode);
- }
- Map<String, Object> headers = (Map<String, Object>) responseData.param.get("headers");
- if (headers != null) {
- for (Entry<String, Object> entry : headers.entrySet()) {
- String name = entry.getKey();
- Object v = entry.getValue();
- if (v instanceof List) {
- List<Object> ll = (List<Object>) v;
- for (Object o : ll) {
- response.addHeader(name, o.toString());
- }
- } else {
- response.setHeader(name, v.toString());
- }
- }
- }
- }
-
- if (responseData.body != null) {
- response.setContentLength(responseData.body.length());
- response.getWriter().write(responseData.body);
- }
- }
-
- @SuppressWarnings("unchecked")
- private MessageData getMessageRequest(RequestWrapper request) {
- MessageData requestData = new MessageData();
- requestData.param = new HashMap<>();
- requestData.param.put("http_method", request.getMethod());
- requestData.param.put("uri", request.getPathInfo());
- requestData.param.put("param", request.getParameterMap());
- requestData.param.put("content_type", request.getContentType());
-
- Map<String, Object> headers = new HashMap<>();
- Enumeration<String> headerNames = request.getHeaderNames();
- while (headerNames.hasMoreElements()) {
- String name = headerNames.nextElement();
- Enumeration<String> values = request.getHeaders(name);
- List<String> valueList = new ArrayList<>();
- while (values.hasMoreElements()) {
- valueList.add(values.nextElement());
- }
- if (valueList.size() > 1) {
- headers.put(name, valueList);
- } else {
- headers.put(name, valueList.get(0));
- }
- }
- requestData.param.put("headers", headers);
-
- requestData.body = request.getBody();
-
- return requestData;
- }
-
- @Override
- public void destroy() {
- }
-
- private static class RequestWrapper extends HttpServletRequestWrapper {
-
- private final String body;
-
- public RequestWrapper(HttpServletRequest request) throws IOException {
- super(request);
-
- StringBuilder stringBuilder = new StringBuilder();
- try (InputStream inputStream = request.getInputStream()) {
- if (inputStream != null) {
- try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) {
- char[] charBuffer = new char[128];
- int bytesRead = -1;
- while ((bytesRead = bufferedReader.read(charBuffer)) > 0) {
- stringBuilder.append(charBuffer, 0, bytesRead);
- }
- }
- }
- }
- body = stringBuilder.toString();
- }
-
- @Override
- public ServletInputStream getInputStream() throws IOException {
- final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body.getBytes());
- ServletInputStream servletInputStream = new ServletInputStream() {
-
- @Override
- public int read() throws IOException {
- return byteArrayInputStream.read();
- }
- };
- return servletInputStream;
- }
-
- @Override
- public BufferedReader getReader() throws IOException {
- return new BufferedReader(new InputStreamReader(getInputStream()));
- }
-
- public String getBody() {
- return body;
- }
- }
-
- public class ResponseWrapper extends HttpServletResponseWrapper {
-
- private CharArrayWriter writer = new CharArrayWriter();
- private Map<String, Object> param = new HashMap<>();
-
- public ResponseWrapper(HttpServletResponse response) {
- super(response);
- }
-
- @Override
- public PrintWriter getWriter() {
- return new PrintWriter(writer);
- }
-
- @Override
- public void setStatus(int status) {
- param.put("http_code", status);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void setHeader(String name, String value) {
- Map<String, Object> headers = (Map<String, Object>) param.get("headers");
- if (headers == null) {
- headers = new HashMap<>();
- param.put("headers", headers);
- }
- headers.put(name, value);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void addHeader(String name, String value) {
- Map<String, Object> headers = (Map<String, Object>) param.get("headers");
- if (headers == null) {
- headers = new HashMap<>();
- param.put("headers", headers);
- }
- Object v = headers.get(name);
- if (v == null) {
- headers.put(name, value);
- } else if (v instanceof List) {
- ((List<Object>) v).add(value);
- } else {
- List<Object> ll = new ArrayList<>();
- ll.add(v);
- ll.add(value);
- headers.put(name, ll);
- }
- }
-
- public MessageData getMessageResponse() {
- MessageData responseData = new MessageData();
- responseData.param = param;
- responseData.body = writer.toString();
- return responseData;
- }
- }
-
- public void setMessageInterceptorFactory(MessageInterceptorFactory messageInterceptorFactory) {
- this.messageInterceptorFactory = messageInterceptorFactory;
- }
+ private MessageInterceptorFactory messageInterceptorFactory;
+
+ @Override
+ public void init(FilterConfig filterConfig) throws ServletException {}
+
+ @Override
+ public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
+ throws IOException, ServletException {
+ RequestWrapper req = new RequestWrapper((HttpServletRequest) request);
+ MessageData requestData = getMessageRequest(req);
+
+ MessageInterceptor interceptor = messageInterceptorFactory.create();
+ MessageData responseData = interceptor.processRequest(requestData);
+
+ if (responseData == null) {
+ ResponseWrapper res = new ResponseWrapper((HttpServletResponse) response);
+
+ chain.doFilter(req, res);
+
+ responseData = res.getMessageResponse();
+ interceptor.processResponse(responseData);
+ }
+
+ setMessageResponse((HttpServletResponse) response, responseData);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void setMessageResponse(HttpServletResponse response, MessageData responseData) throws IOException {
+ if (responseData.getParam() != null) {
+ String contentType = (String) responseData.getParam().get("content_type");
+ if (contentType != null) {
+ response.setContentType(contentType);
+ }
+ Integer httpCode = (Integer) responseData.getParam().get("http_code");
+ if (httpCode != null) {
+ response.setStatus(httpCode);
+ }
+ Map<String, Object> headers = (Map<String, Object>) responseData.getParam().get("headers");
+ if (headers != null) {
+ for (Entry<String, Object> entry : headers.entrySet()) {
+ String name = entry.getKey();
+ Object v = entry.getValue();
+ if (v instanceof List) {
+ List<Object> ll = (List<Object>) v;
+ for (Object o : ll) {
+ response.addHeader(name, o.toString());
+ }
+ } else {
+ response.setHeader(name, v.toString());
+ }
+ }
+ }
+ }
+
+ if (responseData.getBody() != null) {
+ response.setContentLength(responseData.getBody().length());
+ response.getWriter().write(responseData.getBody());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private MessageData getMessageRequest(RequestWrapper request) {
+ HashMap<String, Object> param = new HashMap<>();
+ param.put("http_method", request.getMethod());
+ param.put("uri", request.getPathInfo());
+ param.put("param", request.getParameterMap());
+ param.put("content_type", request.getContentType());
+
+ Map<String, Object> headers = new HashMap<>();
+ Enumeration<String> headerNames = request.getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ String name = headerNames.nextElement();
+ Enumeration<String> values = request.getHeaders(name);
+ List<String> valueList = new ArrayList<>();
+ while (values.hasMoreElements()) {
+ valueList.add(values.nextElement());
+ }
+ if (valueList.size() > 1) {
+ headers.put(name, valueList);
+ } else {
+ headers.put(name, valueList.get(0));
+ }
+ }
+ param.put("headers", headers);
+
+ return new MessageData(param, request.getBody());
+ }
+
+ @Override
+ public void destroy() {}
+
+ private static class RequestWrapper extends HttpServletRequestWrapper {
+
+ private final String body;
+
+ public RequestWrapper(HttpServletRequest request) throws IOException {
+ super(request);
+
+ StringBuilder stringBuilder = new StringBuilder();
+ try (InputStream inputStream = request.getInputStream()) {
+ if (inputStream != null) {
+ try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) {
+ char[] charBuffer = new char[128];
+ int bytesRead = -1;
+ while ((bytesRead = bufferedReader.read(charBuffer)) > 0) {
+ stringBuilder.append(charBuffer, 0, bytesRead);
+ }
+ }
+ }
+ }
+ body = stringBuilder.toString();
+ }
+
+ @Override
+ public ServletInputStream getInputStream() throws IOException {
+ final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body.getBytes());
+ ServletInputStream servletInputStream = new ServletInputStream() {
+
+ @Override
+ public int read() throws IOException {
+ return byteArrayInputStream.read();
+ }
+ };
+ return servletInputStream;
+ }
+
+ @Override
+ public BufferedReader getReader() throws IOException {
+ return new BufferedReader(new InputStreamReader(getInputStream()));
+ }
+
+ public String getBody() {
+ return body;
+ }
+ }
+
+ public class ResponseWrapper extends HttpServletResponseWrapper {
+
+ private CharArrayWriter writer = new CharArrayWriter();
+ private Map<String, Object> param = new HashMap<>();
+
+ public ResponseWrapper(HttpServletResponse response) {
+ super(response);
+ }
+
+ @Override
+ public PrintWriter getWriter() {
+ return new PrintWriter(writer);
+ }
+
+ @Override
+ public void setStatus(int status) {
+ param.put("http_code", status);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setHeader(String name, String value) {
+ Map<String, Object> headers = (Map<String, Object>) param.get("headers");
+ if (headers == null) {
+ headers = new HashMap<>();
+ param.put("headers", headers);
+ }
+ headers.put(name, value);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void addHeader(String name, String value) {
+ Map<String, Object> headers = (Map<String, Object>) param.get("headers");
+ if (headers == null) {
+ headers = new HashMap<>();
+ param.put("headers", headers);
+ }
+ Object v = headers.get(name);
+ if (v == null) {
+ headers.put(name, value);
+ } else if (v instanceof List) {
+ ((List<Object>) v).add(value);
+ } else {
+ List<Object> ll = new ArrayList<>();
+ ll.add(v);
+ ll.add(value);
+ headers.put(name, ll);
+ }
+ }
+
+ public MessageData getMessageResponse() {
+ return new MessageData(param, writer.toString());
+ }
+ }
+
+ public void setMessageInterceptorFactory(MessageInterceptorFactory messageInterceptorFactory) {
+ this.messageInterceptorFactory = messageInterceptorFactory;
+ }
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java
index 42f1b21e7..fefd25fb9 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java
@@ -5,269 +5,281 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DataUtil {
- @SuppressWarnings("unused")
- private static final Logger log = LoggerFactory.getLogger(DataUtil.class);
-
- @SuppressWarnings("unchecked")
- public static Object get(Object struct, String compositeKey) {
- if (struct == null || compositeKey == null) {
- return null;
- }
-
- List<Object> keys = splitCompositeKey(compositeKey);
- Object currentValue = struct;
- String currentKey = "";
-
- for (Object key : keys) {
- if (key instanceof Integer) {
- if (!(currentValue instanceof List)) {
- throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list");
- }
-
- Integer keyi = (Integer) key;
- List<Object> currentValueL = (List<Object>) currentValue;
-
- if (keyi >= currentValueL.size()) {
- return null;
- }
-
- currentValue = currentValueL.get(keyi);
- if (currentValue == null) {
- return null;
- }
-
- currentKey += "[" + key + "]";
- } else {
- if (!(currentValue instanceof Map)) {
- throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map");
- }
-
- currentValue = ((Map<String, Object>) currentValue).get(key);
- if (currentValue == null) {
- return null;
- }
-
- currentKey += "." + key;
- }
- }
- return currentValue;
- }
-
- @SuppressWarnings("unchecked")
- public static void set(Object struct, String compositeKey, Object value) {
- if (struct == null) {
- throw new IllegalArgumentException("Null argument: struct");
- }
-
- if (compositeKey == null || compositeKey.length() == 0) {
- throw new IllegalArgumentException("Null or empty argument: compositeKey");
- }
-
- if (value == null) {
- return;
- }
-
- List<Object> keys = splitCompositeKey(compositeKey);
- Object currentValue = struct;
- String currentKey = "";
-
- for (int i = 0; i < keys.size() - 1; i++) {
- Object key = keys.get(i);
-
- if (key instanceof Integer) {
- if (!(currentValue instanceof List)) {
- throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list");
- }
-
- Integer keyi = (Integer) key;
- List<Object> currentValueL = (List<Object>) currentValue;
- int size = currentValueL.size();
-
- if (keyi >= size) {
- for (int k = 0; k < keyi - size + 1; k++) {
- currentValueL.add(null);
- }
- }
-
- Object newValue = currentValueL.get(keyi);
- if (newValue == null) {
- Object nextKey = keys.get(i + 1);
- if (nextKey instanceof Integer) {
- newValue = new ArrayList<>();
- } else {
- newValue = new HashMap<>();
- }
- currentValueL.set(keyi, newValue);
- }
-
- currentValue = newValue;
- currentKey += "[" + key + "]";
-
- } else {
- if (!(currentValue instanceof Map)) {
- throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map");
- }
-
- Object newValue = ((Map<String, Object>) currentValue).get(key);
- if (newValue == null) {
- Object nextKey = keys.get(i + 1);
- if (nextKey instanceof Integer) {
- newValue = new ArrayList<>();
- } else {
- newValue = new HashMap<>();
- }
- ((Map<String, Object>) currentValue).put((String) key, newValue);
- }
-
- currentValue = newValue;
- currentKey += "." + key;
- }
- }
-
- Object key = keys.get(keys.size() - 1);
- if (key instanceof Integer) {
- if (!(currentValue instanceof List)) {
- throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list");
- }
-
- Integer keyi = (Integer) key;
- List<Object> currentValueL = (List<Object>) currentValue;
- int size = currentValueL.size();
-
- if (keyi >= size) {
- for (int k = 0; k < keyi - size + 1; k++) {
- currentValueL.add(null);
- }
- }
-
- currentValueL.set(keyi, value);
-
- } else {
- if (!(currentValue instanceof Map)) {
- throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map");
- }
-
- ((Map<String, Object>) currentValue).put((String) key, value);
- }
- }
-
- @SuppressWarnings("unchecked")
- public static void delete(Object struct, String compositeKey) {
- if (struct == null) {
- throw new IllegalArgumentException("Null argument: struct");
- }
-
- if (compositeKey == null) {
- throw new IllegalArgumentException("Null argument: compositeKey");
- }
-
- List<Object> keys = splitCompositeKey(compositeKey);
- Object currentValue = struct;
- String currentKey = "";
-
- for (int i = 0; i < keys.size() - 1; i++) {
- Object key = keys.get(i);
-
- if (key instanceof Integer) {
- if (!(currentValue instanceof List)) {
- throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list");
- }
-
- Integer keyi = (Integer) key;
- List<Object> currentValueL = (List<Object>) currentValue;
-
- if (keyi >= currentValueL.size()) {
- return;
- }
-
- currentValue = currentValueL.get(keyi);
- if (currentValue == null) {
- return;
- }
-
- currentKey += "[" + key + "]";
-
- } else {
- if (!(currentValue instanceof Map)) {
- throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map");
- }
-
- currentValue = ((Map<String, Object>) currentValue).get(key);
- if (currentValue == null) {
- return;
- }
-
- currentKey += "." + key;
- }
- }
-
- Object key = keys.get(keys.size() - 1);
- if (key instanceof Integer) {
- if (!(currentValue instanceof List)) {
- throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list");
- }
-
- Integer keyi = (Integer) key;
- List<Object> currentValueL = (List<Object>) currentValue;
-
- if (keyi < currentValueL.size()) {
- currentValueL.remove(keyi);
- }
-
- } else {
- if (!(currentValue instanceof Map)) {
- throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map");
- }
-
- ((Map<String, Object>) currentValue).remove(key);
- }
- }
-
- private static List<Object> splitCompositeKey(String compositeKey) {
- if (compositeKey == null) {
- return Collections.emptyList();
- }
-
- String[] ss = compositeKey.split("\\.");
- List<Object> ll = new ArrayList<>();
- for (String s : ss) {
- if (s.length() == 0) {
- continue;
- }
-
- int i1 = s.indexOf('[');
- if (i1 < 0) {
- ll.add(s);
- } else {
- if (!s.endsWith("]")) {
- throw new IllegalArgumentException("Invalid composite key: " + compositeKey + ": No matching ] found");
- }
-
- String s1 = s.substring(0, i1);
- if (s1.length() > 0) {
- ll.add(s1);
- }
-
- String s2 = s.substring(i1 + 1, s.length() - 1);
- try {
- int n = Integer.parseInt(s2);
- if (n < 0) {
- throw new IllegalArgumentException("Invalid composite key: " + compositeKey + ": Index must be >= 0: " + n);
- }
-
- ll.add(n);
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("Invalid composite key: " + compositeKey + ": Index not a number: " + s2);
- }
- }
- }
-
- return ll;
- }
+ @SuppressWarnings("unused")
+ private static final Logger log = LoggerFactory.getLogger(DataUtil.class);
+
+ @SuppressWarnings("unchecked")
+ public static Object get(Object struct, String compositeKey) {
+ if (struct == null || compositeKey == null) {
+ return null;
+ }
+
+ List<Object> keys = splitCompositeKey(compositeKey);
+ Object currentValue = struct;
+ String currentKey = "";
+
+ for (Object key : keys) {
+ if (key instanceof Integer) {
+ if (!(currentValue instanceof List)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '"
+ + currentKey + "', but '" + currentKey + "' is not a list");
+ }
+
+ Integer keyi = (Integer) key;
+ List<Object> currentValueL = (List<Object>) currentValue;
+
+ if (keyi >= currentValueL.size()) {
+ return null;
+ }
+
+ currentValue = currentValueL.get(keyi);
+ if (currentValue == null) {
+ return null;
+ }
+
+ currentKey += "[" + key + "]";
+ } else {
+ if (!(currentValue instanceof Map)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '"
+ + currentKey + "', but '" + currentKey + "' is not a map");
+ }
+
+ currentValue = ((Map<String, Object>) currentValue).get(key);
+ if (currentValue == null) {
+ return null;
+ }
+
+ currentKey += "." + key;
+ }
+ }
+ return currentValue;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void set(Object struct, String compositeKey, Object value) {
+ if (struct == null) {
+ throw new IllegalArgumentException("Null argument: struct");
+ }
+
+ if (compositeKey == null || compositeKey.length() == 0) {
+ throw new IllegalArgumentException("Null or empty argument: compositeKey");
+ }
+
+ if (value == null) {
+ return;
+ }
+
+ List<Object> keys = splitCompositeKey(compositeKey);
+ Object currentValue = struct;
+ String currentKey = "";
+
+ for (int i = 0; i < keys.size() - 1; i++) {
+ Object key = keys.get(i);
+
+ if (key instanceof Integer) {
+ if (!(currentValue instanceof List)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '"
+ + currentKey + "', but '" + currentKey + "' is not a list");
+ }
+
+ Integer keyi = (Integer) key;
+ List<Object> currentValueL = (List<Object>) currentValue;
+ int size = currentValueL.size();
+
+ if (keyi >= size) {
+ for (int k = 0; k < keyi - size + 1; k++) {
+ currentValueL.add(null);
+ }
+ }
+
+ Object newValue = currentValueL.get(keyi);
+ if (newValue == null) {
+ Object nextKey = keys.get(i + 1);
+ if (nextKey instanceof Integer) {
+ newValue = new ArrayList<>();
+ } else {
+ newValue = new HashMap<>();
+ }
+ currentValueL.set(keyi, newValue);
+ }
+
+ currentValue = newValue;
+ currentKey += "[" + key + "]";
+
+ } else {
+ if (!(currentValue instanceof Map)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '"
+ + currentKey + "', but '" + currentKey + "' is not a map");
+ }
+
+ Object newValue = ((Map<String, Object>) currentValue).get(key);
+ if (newValue == null) {
+ Object nextKey = keys.get(i + 1);
+ if (nextKey instanceof Integer) {
+ newValue = new ArrayList<>();
+ } else {
+ newValue = new HashMap<>();
+ }
+ ((Map<String, Object>) currentValue).put((String) key, newValue);
+ }
+
+ currentValue = newValue;
+ currentKey += "." + key;
+ }
+ }
+
+ Object key = keys.get(keys.size() - 1);
+ if (key instanceof Integer) {
+ if (!(currentValue instanceof List)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '"
+ + currentKey + "', but '" + currentKey + "' is not a list");
+ }
+
+ Integer keyi = (Integer) key;
+ List<Object> currentValueL = (List<Object>) currentValue;
+ int size = currentValueL.size();
+
+ if (keyi >= size) {
+ for (int k = 0; k < keyi - size + 1; k++) {
+ currentValueL.add(null);
+ }
+ }
+
+ currentValueL.set(keyi, value);
+
+ } else {
+ if (!(currentValue instanceof Map)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey
+ + "', but '" + currentKey + "' is not a map");
+ }
+
+ ((Map<String, Object>) currentValue).put((String) key, value);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void delete(Object struct, String compositeKey) {
+ if (struct == null) {
+ throw new IllegalArgumentException("Null argument: struct");
+ }
+
+ if (compositeKey == null) {
+ throw new IllegalArgumentException("Null argument: compositeKey");
+ }
+
+ List<Object> keys = splitCompositeKey(compositeKey);
+ Object currentValue = struct;
+ String currentKey = "";
+
+ for (int i = 0; i < keys.size() - 1; i++) {
+ Object key = keys.get(i);
+
+ if (key instanceof Integer) {
+ if (!(currentValue instanceof List)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '"
+ + currentKey + "', but '" + currentKey + "' is not a list");
+ }
+
+ Integer keyi = (Integer) key;
+ List<Object> currentValueL = (List<Object>) currentValue;
+
+ if (keyi >= currentValueL.size()) {
+ return;
+ }
+
+ currentValue = currentValueL.get(keyi);
+ if (currentValue == null) {
+ return;
+ }
+
+ currentKey += "[" + key + "]";
+
+ } else {
+ if (!(currentValue instanceof Map)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '"
+ + currentKey + "', but '" + currentKey + "' is not a map");
+ }
+
+ currentValue = ((Map<String, Object>) currentValue).get(key);
+ if (currentValue == null) {
+ return;
+ }
+
+ currentKey += "." + key;
+ }
+ }
+
+ Object key = keys.get(keys.size() - 1);
+ if (key instanceof Integer) {
+ if (!(currentValue instanceof List)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '"
+ + currentKey + "', but '" + currentKey + "' is not a list");
+ }
+
+ Integer keyi = (Integer) key;
+ List<Object> currentValueL = (List<Object>) currentValue;
+
+ if (keyi < currentValueL.size()) {
+ currentValueL.remove(keyi);
+ }
+
+ } else {
+ if (!(currentValue instanceof Map)) {
+ throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey
+ + "', but '" + currentKey + "' is not a map");
+ }
+
+ ((Map<String, Object>) currentValue).remove(key);
+ }
+ }
+
+ private static List<Object> splitCompositeKey(String compositeKey) {
+ if (compositeKey == null) {
+ return Collections.emptyList();
+ }
+
+ String[] ss = compositeKey.split("\\.");
+ List<Object> ll = new ArrayList<>();
+ for (String s : ss) {
+ if (s.length() == 0) {
+ continue;
+ }
+
+ int i1 = s.indexOf('[');
+ if (i1 < 0) {
+ ll.add(s);
+ } else {
+ if (!s.endsWith("]")) {
+ throw new IllegalArgumentException(
+ "Invalid composite key: " + compositeKey + ": No matching ] found");
+ }
+
+ String s1 = s.substring(0, i1);
+ if (s1.length() > 0) {
+ ll.add(s1);
+ }
+
+ String s2 = s.substring(i1 + 1, s.length() - 1);
+ try {
+ int n = Integer.parseInt(s2);
+ if (n < 0) {
+ throw new IllegalArgumentException(
+ "Invalid composite key: " + compositeKey + ": Index must be >= 0: " + n);
+ }
+
+ ll.add(n);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "Invalid composite key: " + compositeKey + ": Index not a number: " + s2);
+ }
+ }
+ }
+
+ return ll;
+ }
}
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java
index fc9ce0936..8d45a5f22 100644
--- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java
+++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java
@@ -7,313 +7,317 @@ import java.util.Map;
public class JsonUtil {
- public static Object jsonToData(String s) {
- if (s == null) {
- return null;
- }
- return jsonToData(new Current(s));
- }
-
- private static class Current {
-
- public String s;
- public int i;
- public int line, pos;
-
- public Current(String s) {
- this.s = s;
- i = 0;
- line = 1;
- pos = 1;
- }
-
- public void move() {
- i++;
- pos++;
- }
-
- public void move(int k) {
- i += k;
- pos += k;
- }
-
- public boolean end() {
- return i >= s.length();
- }
-
- public char get() {
- return s.charAt(i);
- }
-
- public boolean is(String ss) {
- return i < s.length() - ss.length() && s.substring(i, i + ss.length()).equals(ss);
- }
-
- public boolean is(char c) {
- return i < s.length() && s.charAt(i) == c;
- }
-
- public void skipWhiteSpace() {
- while (i < s.length() && s.charAt(i) <= 32) {
- char cc = s.charAt(i);
- if (cc == '\n') {
- line++;
- pos = 1;
- } else if (cc == ' ' || cc == '\t') {
- pos++;
- }
- i++;
- }
- }
- }
-
- public static Object jsonToData(Current c) {
- c.skipWhiteSpace();
-
- if (c.end()) {
- return "";
- }
-
- char cc = c.get();
- if (cc == '{') {
- c.move();
- return jsonToMap(c);
- }
- if (cc == '[') {
- c.move();
- return jsonToList(c);
- }
- return jsonToObject(c);
- }
-
- private static Object jsonToObject(Current c) {
- if (c.is('"')) {
- c.move();
- StringBuilder ss = new StringBuilder();
- while (!c.end() && c.get() != '"') {
- if (c.get() == '\\') {
- c.move();
- char cc = c.get();
- switch (cc) {
- case '\\':
- ss.append('\\');
- break;
- case '"':
- ss.append('\"');
- break;
- case 'n':
- ss.append('\n');
- break;
- case 'r':
- ss.append('\r');
- break;
- case 't':
- ss.append('\t');
- break;
- default:
- throw new IllegalArgumentException("JSON parsing error: Invalid escaped character at (" + c.line + ':' + c.pos + "): " + cc);
- }
- } else {
- ss.append(c.get());
- }
- c.move();
- }
- if (!c.end()) {
- c.move(); // Skip the closing "
- }
- return ss.toString();
- }
- if (c.is("true")) {
- c.move(4);
- return true;
- }
- if (c.is("false")) {
- c.move(5);
- return false;
- }
- if (c.is("null")) {
- c.move(4);
- return null;
- }
- StringBuilder ss = new StringBuilder();
- while (!c.end() && c.get() != ',' && c.get() != ']' && c.get() != '}' && c.get() != '\n' && c.get() != '\t') {
- ss.append(c.get());
- c.move();
- }
- try {
- return Long.valueOf(ss.toString());
- } catch (Exception e1) {
- try {
- return Double.valueOf(ss.toString());
- } catch (Exception e2) {
- throw new IllegalArgumentException("JSON parsing error: Invalid value at (" + c.line + ':' + c.pos + "): " + ss.toString());
- }
- }
- }
-
- private static List<Object> jsonToList(Current c) {
- List<Object> ll = new ArrayList<>();
- c.skipWhiteSpace();
- while (!c.end() && c.get() != ']') {
- Object value = jsonToData(c);
-
- ll.add(value);
-
- c.skipWhiteSpace();
- if (c.is(',')) {
- c.move();
- c.skipWhiteSpace();
- }
- }
-
- if (!c.end()) {
- c.move(); // Skip the closing ]
- }
-
- return ll;
- }
-
- private static Map<String, Object> jsonToMap(Current c) {
- Map<String, Object> mm = new HashMap<>();
- c.skipWhiteSpace();
- while (!c.end() && c.get() != '}') {
- Object key = jsonToObject(c);
- if (!(key instanceof String)) {
- throw new IllegalArgumentException("JSON parsing error: Invalid key at (" + c.line + ':' + c.pos + "): " + key);
- }
-
- c.skipWhiteSpace();
- if (!c.is(':')) {
- throw new IllegalArgumentException("JSON parsing error: Expected ':' at (" + c.line + ':' + c.pos + ")");
- }
-
- c.move(); // Skip the :
- Object value = jsonToData(c);
-
- mm.put((String) key, value);
-
- c.skipWhiteSpace();
- if (c.is(',')) {
- c.move();
- c.skipWhiteSpace();
- }
- }
-
- if (!c.end()) {
- c.move(); // Skip the closing }
- }
-
- return mm;
- }
-
- public static String dataToJson(Object o, boolean escape) {
- StringBuilder sb = new StringBuilder();
- str(sb, o, 0, false, escape);
- return sb.toString();
- }
-
- public static String dataToJson(Object o) {
- return dataToJson(o, true);
- }
-
- @SuppressWarnings("unchecked")
- private static void str(StringBuilder ss, Object o, int indent, boolean padFirst, boolean escape) {
- if (o == null || o instanceof Boolean || o instanceof Number) {
- if (padFirst) {
- ss.append(pad(indent));
- }
- ss.append(o);
- return;
- }
-
- if (o instanceof String) {
- String s = escape ? escapeJson((String) o) : (String) o;
- if (padFirst) {
- ss.append(pad(indent));
- }
- ss.append('"').append(s).append('"');
- return;
- }
-
- if (o instanceof Number || o instanceof Boolean) {
- if (padFirst) {
- ss.append(pad(indent));
- }
- ss.append(o);
- return;
- }
-
- if (o instanceof Map) {
- Map<String, Object> mm = (Map<String, Object>) o;
-
- if (padFirst) {
- ss.append(pad(indent));
- }
- ss.append("{\n");
-
- boolean first = true;
- for (String k : mm.keySet()) {
- if (!first) {
- ss.append(",\n");
- }
- first = false;
-
- Object v = mm.get(k);
- ss.append(pad(indent + 1));
- if (escape) {
- ss.append('"');
- }
- ss.append(k);
- if (escape) {
- ss.append('"');
- }
- ss.append(": ");
- str(ss, v, indent + 1, false, escape);
- }
-
- ss.append("\n");
- ss.append(pad(indent)).append('}');
-
- return;
- }
-
- if (o instanceof List) {
- List<Object> ll = (List<Object>) o;
-
- if (padFirst) {
- ss.append(pad(indent));
- }
- ss.append("[\n");
-
- boolean first = true;
- for (Object o1 : ll) {
- if (!first) {
- ss.append(",\n");
- }
- first = false;
-
- str(ss, o1, indent + 1, true, escape);
- }
-
- ss.append("\n");
- ss.append(pad(indent)).append(']');
- }
- }
-
- private static String escapeJson(String v) {
- String s = v.replaceAll("\\\\", "\\\\\\\\");
- s = s.replaceAll("\"", "\\\\\"");
- s = s.replaceAll("\n", "\\\\n");
- s = s.replaceAll("\r", "\\\\r");
- s = s.replaceAll("\t", "\\\\t");
- return s;
- }
-
- private static String pad(int n) {
- String s = "";
- for (int i = 0; i < n; i++) {
- s += " ";
- }
- return s;
- }
+ public static Object jsonToData(String s) {
+ if (s == null) {
+ return null;
+ }
+ return jsonToData(new Current(s));
+ }
+
+ private static class Current {
+
+ public String s;
+ public int i;
+ public int line, pos;
+
+ public Current(String s) {
+ this.s = s;
+ i = 0;
+ line = 1;
+ pos = 1;
+ }
+
+ public void move() {
+ i++;
+ pos++;
+ }
+
+ public void move(int k) {
+ i += k;
+ pos += k;
+ }
+
+ public boolean end() {
+ return i >= s.length();
+ }
+
+ public char get() {
+ return s.charAt(i);
+ }
+
+ public boolean is(String ss) {
+ return i < s.length() - ss.length() && s.substring(i, i + ss.length()).equals(ss);
+ }
+
+ public boolean is(char c) {
+ return i < s.length() && s.charAt(i) == c;
+ }
+
+ public void skipWhiteSpace() {
+ while (i < s.length() && s.charAt(i) <= 32) {
+ char cc = s.charAt(i);
+ if (cc == '\n') {
+ line++;
+ pos = 1;
+ } else if (cc == ' ' || cc == '\t') {
+ pos++;
+ }
+ i++;
+ }
+ }
+ }
+
+ public static Object jsonToData(Current c) {
+ c.skipWhiteSpace();
+
+ if (c.end()) {
+ return "";
+ }
+
+ char cc = c.get();
+ if (cc == '{') {
+ c.move();
+ return jsonToMap(c);
+ }
+ if (cc == '[') {
+ c.move();
+ return jsonToList(c);
+ }
+ return jsonToObject(c);
+ }
+
+ private static Object jsonToObject(Current c) {
+ if (c.is('"')) {
+ c.move();
+ StringBuilder ss = new StringBuilder();
+ while (!c.end() && c.get() != '"') {
+ if (c.get() == '\\') {
+ c.move();
+ char cc = c.get();
+ switch (cc) {
+ case '\\':
+ ss.append('\\');
+ break;
+ case '"':
+ ss.append('\"');
+ break;
+ case 'n':
+ ss.append('\n');
+ break;
+ case 'r':
+ ss.append('\r');
+ break;
+ case 't':
+ ss.append('\t');
+ break;
+ default:
+ throw new IllegalArgumentException("JSON parsing error: Invalid escaped character at ("
+ + c.line + ':' + c.pos + "): " + cc);
+ }
+ } else {
+ ss.append(c.get());
+ }
+ c.move();
+ }
+ if (!c.end()) {
+ c.move(); // Skip the closing "
+ }
+ return ss.toString();
+ }
+ if (c.is("true")) {
+ c.move(4);
+ return true;
+ }
+ if (c.is("false")) {
+ c.move(5);
+ return false;
+ }
+ if (c.is("null")) {
+ c.move(4);
+ return null;
+ }
+ StringBuilder ss = new StringBuilder();
+ while (!c.end() && c.get() != ',' && c.get() != ']' && c.get() != '}' && c.get() != '\n' && c.get() != '\t') {
+ ss.append(c.get());
+ c.move();
+ }
+ try {
+ return Long.valueOf(ss.toString());
+ } catch (Exception e1) {
+ try {
+ return Double.valueOf(ss.toString());
+ } catch (Exception e2) {
+ throw new IllegalArgumentException(
+ "JSON parsing error: Invalid value at (" + c.line + ':' + c.pos + "): " + ss.toString());
+ }
+ }
+ }
+
+ private static List<Object> jsonToList(Current c) {
+ List<Object> ll = new ArrayList<>();
+ c.skipWhiteSpace();
+ while (!c.end() && c.get() != ']') {
+ Object value = jsonToData(c);
+
+ ll.add(value);
+
+ c.skipWhiteSpace();
+ if (c.is(',')) {
+ c.move();
+ c.skipWhiteSpace();
+ }
+ }
+
+ if (!c.end()) {
+ c.move(); // Skip the closing ]
+ }
+
+ return ll;
+ }
+
+ private static Map<String, Object> jsonToMap(Current c) {
+ Map<String, Object> mm = new HashMap<>();
+ c.skipWhiteSpace();
+ while (!c.end() && c.get() != '}') {
+ Object key = jsonToObject(c);
+ if (!(key instanceof String)) {
+ throw new IllegalArgumentException(
+ "JSON parsing error: Invalid key at (" + c.line + ':' + c.pos + "): " + key);
+ }
+
+ c.skipWhiteSpace();
+ if (!c.is(':')) {
+ throw new IllegalArgumentException(
+ "JSON parsing error: Expected ':' at (" + c.line + ':' + c.pos + ")");
+ }
+
+ c.move(); // Skip the :
+ Object value = jsonToData(c);
+
+ mm.put((String) key, value);
+
+ c.skipWhiteSpace();
+ if (c.is(',')) {
+ c.move();
+ c.skipWhiteSpace();
+ }
+ }
+
+ if (!c.end()) {
+ c.move(); // Skip the closing }
+ }
+
+ return mm;
+ }
+
+ public static String dataToJson(Object o, boolean escape) {
+ StringBuilder sb = new StringBuilder();
+ str(sb, o, 0, false, escape);
+ return sb.toString();
+ }
+
+ public static String dataToJson(Object o) {
+ return dataToJson(o, true);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void str(StringBuilder ss, Object o, int indent, boolean padFirst, boolean escape) {
+ if (o == null || o instanceof Boolean || o instanceof Number) {
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ ss.append(o);
+ return;
+ }
+
+ if (o instanceof String) {
+ String s = escape ? escapeJson((String) o) : (String) o;
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ ss.append('"').append(s).append('"');
+ return;
+ }
+
+ if (o instanceof Number || o instanceof Boolean) {
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ ss.append(o);
+ return;
+ }
+
+ if (o instanceof Map) {
+ Map<String, Object> mm = (Map<String, Object>) o;
+
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ ss.append("{\n");
+
+ boolean first = true;
+ for (String k : mm.keySet()) {
+ if (!first) {
+ ss.append(",\n");
+ }
+ first = false;
+
+ Object v = mm.get(k);
+ ss.append(pad(indent + 1));
+ if (escape) {
+ ss.append('"');
+ }
+ ss.append(k);
+ if (escape) {
+ ss.append('"');
+ }
+ ss.append(": ");
+ str(ss, v, indent + 1, false, escape);
+ }
+
+ ss.append("\n");
+ ss.append(pad(indent)).append('}');
+
+ return;
+ }
+
+ if (o instanceof List) {
+ List<Object> ll = (List<Object>) o;
+
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ ss.append("[\n");
+
+ boolean first = true;
+ for (Object o1 : ll) {
+ if (!first) {
+ ss.append(",\n");
+ }
+ first = false;
+
+ str(ss, o1, indent + 1, true, escape);
+ }
+
+ ss.append("\n");
+ ss.append(pad(indent)).append(']');
+ }
+ }
+
+ private static String escapeJson(String v) {
+ String s = v.replaceAll("\\\\", "\\\\\\\\");
+ s = s.replaceAll("\"", "\\\\\"");
+ s = s.replaceAll("\n", "\\\\n");
+ s = s.replaceAll("\r", "\\\\r");
+ s = s.replaceAll("\t", "\\\\t");
+ return s;
+ }
+
+ private static String pad(int n) {
+ String s = "";
+ for (int i = 0; i < n; i++) {
+ s += " ";
+ }
+ return s;
+ }
}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueDataItem.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueDataItem.java
index 32e94e5d3..e14e32545 100644
--- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueDataItem.java
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueDataItem.java
@@ -8,84 +8,84 @@ import org.onap.ccsdk.features.lib.doorman.data.MessageStatus;
public class MessageQueueDataItem implements Comparable<MessageQueueDataItem> {
- public Date timeStamp;
- public String extMessageId;
- public MessageStatus status;
- public MessageAction action;
+ public Date timeStamp;
+ public String extMessageId;
+ public MessageStatus status;
+ public MessageAction action;
- @Override
- public int compareTo(MessageQueueDataItem other) {
- int c = timeStamp.compareTo(other.timeStamp);
- if (c == 0) {
- if (action != null && other.status != null) {
- return -1;
- }
- if (status != null && other.action != null) {
- return 1;
- }
- }
- return c;
- }
+ @Override
+ public int compareTo(MessageQueueDataItem other) {
+ int c = timeStamp.compareTo(other.timeStamp);
+ if (c == 0) {
+ if (action != null && other.status != null) {
+ return -1;
+ }
+ if (status != null && other.action != null) {
+ return 1;
+ }
+ }
+ return c;
+ }
- @Override
- public int hashCode() {
- return System.identityHashCode(extMessageId);
- }
+ @Override
+ public int hashCode() {
+ return System.identityHashCode(extMessageId);
+ }
- @Override
- public boolean equals(Object o) {
- if (o == null || !(o instanceof MessageQueueDataItem)) {
- return false;
- }
- MessageQueueDataItem other = (MessageQueueDataItem) o;
- if (!extMessageId.equals(other.extMessageId)) {
- return false;
- }
- if (status != null && (other.status == null || status.status != other.status.status)) {
- return false;
- }
- if (action != null) {
- if (other.action == null || action.action != other.action.action) {
- return false;
- }
- if (action.action == MessageActionValue.HOLD || action.action == MessageActionValue.RETURN_HOLD) {
- if (action.holdTime != other.action.holdTime) {
- return false;
- }
- } else if (action.action == MessageActionValue.RETURN_COMPLETE) {
- if (!action.resolution.equals(other.action.resolution)) {
- return false;
- }
- }
- }
- return true;
- }
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || !(o instanceof MessageQueueDataItem)) {
+ return false;
+ }
+ MessageQueueDataItem other = (MessageQueueDataItem) o;
+ if (!extMessageId.equals(other.extMessageId)) {
+ return false;
+ }
+ if (status != null && (other.status == null || status.getStatus() != other.status.getStatus())) {
+ return false;
+ }
+ if (action != null) {
+ if (other.action == null || action.getAction() != other.action.getAction()) {
+ return false;
+ }
+ if (action.getAction() == MessageActionValue.HOLD || action.getAction() == MessageActionValue.RETURN_HOLD) {
+ if (action.getHoldTime() != other.action.getHoldTime()) {
+ return false;
+ }
+ } else if (action.getAction() == MessageActionValue.RETURN_COMPLETE) {
+ if (!action.getResolution().equals(other.action.getResolution())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
- @Override
- public String toString() {
- StringBuilder ss = new StringBuilder();
- if (timeStamp != null) {
- ss.append(df.format(timeStamp)).append(" | ");
- }
- if (status != null) {
- ss.append("STATUS: ");
- } else {
- ss.append("ACTION: ");
- }
- ss.append(String.format("%-20s | ", extMessageId));
- if (status != null) {
- ss.append(status.status);
- } else {
- ss.append(action.action);
- if (action.holdTime > 0) {
- ss.append(" | ").append(action.holdTime);
- }
- if (action.resolution != null) {
- ss.append(" | ").append(action.resolution);
- }
- }
- return ss.toString();
- }
+ @Override
+ public String toString() {
+ StringBuilder ss = new StringBuilder();
+ if (timeStamp != null) {
+ ss.append(df.format(timeStamp)).append(" | ");
+ }
+ if (status != null) {
+ ss.append("STATUS: ");
+ } else {
+ ss.append("ACTION: ");
+ }
+ ss.append(String.format("%-20s | ", extMessageId));
+ if (status != null) {
+ ss.append(status.getStatus());
+ } else {
+ ss.append(action.getAction());
+ if (action.getHoldTime() > 0) {
+ ss.append(" | ").append(action.getHoldTime());
+ }
+ if (action.getResolution() != null) {
+ ss.append(" | ").append(action.getResolution());
+ }
+ }
+ return ss.toString();
+ }
- private static SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ private static SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTest.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTest.java
index ca1c4910d..b2f69dbcf 100644
--- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTest.java
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTest.java
@@ -32,191 +32,195 @@ import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
public class MessageQueueTest {
- private static final Logger log = LoggerFactory.getLogger(MessageQueueTest.class);
-
- private String test;
-
- public MessageQueueTest(String test) {
- this.test = test;
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void test() throws Exception {
- log.info("#########################################################################");
- log.info("MessageQueueTest: " + test + " started");
-
- String testSetupJson = readResource("it/" + test + "/test.json");
- Map<String, Object> testSetup = (Map<String, Object>) JsonUtil.jsonToData(testSetupJson);
-
- Map<String, MessageQueueHandler> handlerMap = new HashMap<>();
-
- for (Object o : (List<Object>) testSetup.get("handler_list")) {
- Map<String, Object> handlerSetup = (Map<String, Object>) o;
- String queueType = (String) handlerSetup.get("queue_type");
- String handlerClassName = (String) handlerSetup.get("handler_class");
- try {
- Class<?> handlerClass = Class.forName("org.onap.ccsdk.features.lib.doorman.it." + test + "." + handlerClassName);
- MessageQueueHandler handler = (MessageQueueHandler) handlerClass.newInstance();
- handlerMap.put(queueType, handler);
- log.info("Handler found for queue type: " + queueType + ": " + handlerClass.getName());
- } catch (Exception e) {
- }
- }
-
- MessageInterceptorFactory factory = setupMessageInterceptorFactory(handlerMap, new Classifier(), new Processor());
-
- List<Object> requestList = (List<Object>) testSetup.get("request_list");
-
- List<Thread> threadList = new ArrayList<>();
-
- for (int i = 0; i < requestList.size(); i++) {
- Map<String, Object> requestSetup = (Map<String, Object>) requestList.get(i);
-
- MessageData request = new MessageData();
- request.param = (Map<String, Object>) requestSetup.get("request_param");
- Map<String, Object> requestBodyData = (Map<String, Object>) requestSetup.get("request_body");
- if (requestBodyData != null) {
- request.body = JsonUtil.dataToJson(requestBodyData);
- } else {
- request.body = readResource("it/" + test + "/" + requestSetup.get("request_body_file"));
- }
-
- MessageData response = new MessageData();
- response.param = (Map<String, Object>) requestSetup.get("response_param");
- Map<String, Object> responseBodyData = (Map<String, Object>) requestSetup.get("response_body");
- if (responseBodyData != null) {
- response.body = JsonUtil.dataToJson(responseBodyData);
- } else {
- response.body = readResource("it/" + test + "/" + requestSetup.get("response_body_file"));
- }
-
- long startTime = (Long) requestSetup.get("start_time");
- long processTime = (Long) requestSetup.get("process_time");
-
- MessageInterceptor interceptor = factory.create();
-
- Thread t = new Thread((Runnable) () -> {
- try {
- Thread.sleep(startTime);
- } catch (InterruptedException e) {
- }
-
- MessageData r = interceptor.processRequest(request);
-
- if (r == null) {
- try {
- Thread.sleep(processTime);
- } catch (InterruptedException e) {
- }
-
- interceptor.processResponse(response);
- }
-
- }, "Message-" + i);
-
- threadList.add(t);
- t.start();
- }
-
- for (Thread t : threadList) {
- t.join();
- }
-
- log.info("MessageQueueTest: " + test + " completed");
- log.info("Result:");
-
- String testResultJson = readResource("it/" + test + "/result.json");
- Map<String, Object> testResult = (Map<String, Object>) JsonUtil.jsonToData(testResultJson);
- MessageQueueTestResult.checkResult(testResult);
- }
-
- private static class Classifier implements MessageClassifier {
-
- @Override
- public Queue determineQueue(MessageData request) {
- Queue q = new Queue();
- q.type = (String) request.param.get("queue_type");
- q.id = (String) request.param.get("queue_id");
- return q;
- }
-
- @Override
- public String getExtMessageId(MessageData request) {
- return (String) request.param.get("request_id");
- }
- }
-
- private static class Processor implements MessageProcessor {
-
- @Override
- public void processMessage(MessageData request) {
- long processTime = (Long) request.param.get("process_time");
- try {
- Thread.sleep(processTime);
- } catch (InterruptedException e) {
- }
- }
- }
-
- public static MessageInterceptorFactory setupMessageInterceptorFactory(Map<String, MessageQueueHandler> handlerMap, MessageClassifier classifier, MessageProcessor processor) {
- LockHelperImpl lockHelper = new LockHelperImpl();
- lockHelper.setDataSource(DbUtil.getDataSource());
- lockHelper.setLockWait(5);
- lockHelper.setRetryCount(10);
-
- MessageDaoImpl messageDao = new MessageDaoImpl();
- messageDao.setDataSource(DbUtil.getDataSource());
-
- MessageInterceptorFactoryImpl f = new MessageInterceptorFactoryImpl();
- f.setMessageDao(messageDao);
- f.setLockHelper(lockHelper);
- f.setLockTimeout(20);
- f.setHandlerMap(handlerMap);
- f.setMessageClassifier(classifier);
- f.setMessageProcessor(processor);
- return f;
- }
-
- @Parameters(name = "{0}")
- public static Collection<Object[]> allTests() {
- List<Object[]> ll = new ArrayList<>();
-
- String[] tcList = list("it");
- for (String tc : tcList) {
- ll.add(new Object[] { tc });
- }
- return ll;
- }
-
- private static String[] list(String dir) {
- try {
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- URL url = loader.getResource(dir);
- String path = url.getPath();
- return new File(path).list();
- } catch (Exception e) {
- log.warn("Error getting directory list for: " + dir + ": " + e.getMessage(), e);
- return new String[0];
- }
- }
-
- private static String readResource(String resource) {
- try {
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- InputStream ins = loader.getResourceAsStream(resource);
- BufferedReader in = new BufferedReader(new InputStreamReader(ins));
- StringBuilder ss = new StringBuilder();
- String line = in.readLine();
- while (line != null) {
- ss.append(line).append('\n');
- line = in.readLine();
- }
- in.close();
- return ss.toString();
- } catch (Exception e) {
- log.warn("Error reading resource: " + resource + ": " + e.getMessage(), e);
- return null;
- }
- }
+ private static final Logger log = LoggerFactory.getLogger(MessageQueueTest.class);
+
+ private String test;
+
+ public MessageQueueTest(String test) {
+ this.test = test;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void test() throws Exception {
+ log.info("#########################################################################");
+ log.info("MessageQueueTest: " + test + " started");
+
+ String testSetupJson = readResource("it/" + test + "/test.json");
+ Map<String, Object> testSetup = (Map<String, Object>) JsonUtil.jsonToData(testSetupJson);
+
+ Map<String, MessageQueueHandler> handlerMap = new HashMap<>();
+
+ for (Object o : (List<Object>) testSetup.get("handler_list")) {
+ Map<String, Object> handlerSetup = (Map<String, Object>) o;
+ String queueType = (String) handlerSetup.get("queue_type");
+ String handlerClassName = (String) handlerSetup.get("handler_class");
+ try {
+ Class<?> handlerClass =
+ Class.forName("org.onap.ccsdk.features.lib.doorman.it." + test + "." + handlerClassName);
+ MessageQueueHandler handler = (MessageQueueHandler) handlerClass.newInstance();
+ handlerMap.put(queueType, handler);
+ log.info("Handler found for queue type: " + queueType + ": " + handlerClass.getName());
+ } catch (Exception e) {
+ }
+ }
+
+ MessageInterceptorFactory factory =
+ setupMessageInterceptorFactory(handlerMap, new Classifier(), new Processor());
+
+ List<Object> requestList = (List<Object>) testSetup.get("request_list");
+
+ List<Thread> threadList = new ArrayList<>();
+
+ for (int i = 0; i < requestList.size(); i++) {
+ Map<String, Object> requestSetup = (Map<String, Object>) requestList.get(i);
+
+ Map<String, Object> requestParam = (Map<String, Object>) requestSetup.get("request_param");
+ Map<String, Object> requestBodyData = (Map<String, Object>) requestSetup.get("request_body");
+ String requestBody = null;
+ if (requestBodyData != null) {
+ requestBody = JsonUtil.dataToJson(requestBodyData);
+ } else {
+ requestBody = readResource("it/" + test + "/" + requestSetup.get("request_body_file"));
+ }
+ MessageData request = new MessageData(requestParam, requestBody);
+
+ Map<String, Object> responseParam = (Map<String, Object>) requestSetup.get("response_param");
+ Map<String, Object> responseBodyData = (Map<String, Object>) requestSetup.get("response_body");
+ String responseBody = null;
+ if (responseBodyData != null) {
+ responseBody = JsonUtil.dataToJson(responseBodyData);
+ } else {
+ responseBody = readResource("it/" + test + "/" + requestSetup.get("response_body_file"));
+ }
+ MessageData response = new MessageData(responseParam, responseBody);
+
+ long startTime = (Long) requestSetup.get("start_time");
+ long processTime = (Long) requestSetup.get("process_time");
+
+ MessageInterceptor interceptor = factory.create();
+
+ Thread t = new Thread((Runnable) () -> {
+ try {
+ Thread.sleep(startTime);
+ } catch (InterruptedException e) {
+ }
+
+ MessageData r = interceptor.processRequest(request);
+
+ if (r == null) {
+ try {
+ Thread.sleep(processTime);
+ } catch (InterruptedException e) {
+ }
+
+ interceptor.processResponse(response);
+ }
+
+ }, "Message-" + i);
+
+ threadList.add(t);
+ t.start();
+ }
+
+ for (Thread t : threadList) {
+ t.join();
+ }
+
+ log.info("MessageQueueTest: " + test + " completed");
+ log.info("Result:");
+
+ String testResultJson = readResource("it/" + test + "/result.json");
+ Map<String, Object> testResult = (Map<String, Object>) JsonUtil.jsonToData(testResultJson);
+ MessageQueueTestResult.checkResult(testResult);
+ }
+
+ private static class Classifier implements MessageClassifier {
+
+ @Override
+ public Queue determineQueue(MessageData request) {
+ String queueType = (String) request.getParam().get("queue_type");
+ String queueId = (String) request.getParam().get("queue_id");
+ return new Queue(queueType, queueId);
+ }
+
+ @Override
+ public String getExtMessageId(MessageData request) {
+ return (String) request.getParam().get("request_id");
+ }
+ }
+
+ private static class Processor implements MessageProcessor {
+
+ @Override
+ public void processMessage(MessageData request) {
+ long processTime = (Long) request.getParam().get("process_time");
+ try {
+ Thread.sleep(processTime);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ public static MessageInterceptorFactory setupMessageInterceptorFactory(Map<String, MessageQueueHandler> handlerMap,
+ MessageClassifier classifier, MessageProcessor processor) {
+ LockHelperImpl lockHelper = new LockHelperImpl();
+ lockHelper.setDataSource(DbUtil.getDataSource());
+ lockHelper.setLockWait(5);
+ lockHelper.setRetryCount(10);
+
+ MessageDaoImpl messageDao = new MessageDaoImpl();
+ messageDao.setDataSource(DbUtil.getDataSource());
+
+ MessageInterceptorFactoryImpl f = new MessageInterceptorFactoryImpl();
+ f.setMessageDao(messageDao);
+ f.setLockHelper(lockHelper);
+ f.setLockTimeout(20);
+ f.setHandlerMap(handlerMap);
+ f.setMessageClassifier(classifier);
+ f.setMessageProcessor(processor);
+ return f;
+ }
+
+ @Parameters(name = "{0}")
+ public static Collection<Object[]> allTests() {
+ List<Object[]> ll = new ArrayList<>();
+
+ String[] tcList = list("it");
+ for (String tc : tcList) {
+ ll.add(new Object[] {tc});
+ }
+ return ll;
+ }
+
+ private static String[] list(String dir) {
+ try {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ URL url = loader.getResource(dir);
+ String path = url.getPath();
+ return new File(path).list();
+ } catch (Exception e) {
+ log.warn("Error getting directory list for: " + dir + ": " + e.getMessage(), e);
+ return new String[0];
+ }
+ }
+
+ private static String readResource(String resource) {
+ try {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ InputStream ins = loader.getResourceAsStream(resource);
+ BufferedReader in = new BufferedReader(new InputStreamReader(ins));
+ StringBuilder ss = new StringBuilder();
+ String line = in.readLine();
+ while (line != null) {
+ ss.append(line).append('\n');
+ line = in.readLine();
+ }
+ in.close();
+ return ss.toString();
+ } catch (Exception e) {
+ log.warn("Error reading resource: " + resource + ": " + e.getMessage(), e);
+ return null;
+ }
+ }
}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTestResult.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTestResult.java
index 71a2eeafb..9d297d52f 100644
--- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTestResult.java
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTestResult.java
@@ -1,7 +1,6 @@
package org.onap.ccsdk.features.lib.doorman.it;
import static org.junit.Assert.fail;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -19,102 +18,101 @@ import org.slf4j.LoggerFactory;
public class MessageQueueTestResult {
- private static final Logger log = LoggerFactory.getLogger(MessageQueueTest.class);
+ private static final Logger log = LoggerFactory.getLogger(MessageQueueTest.class);
- public static List<MessageQueueDataItem> readResult(String queueType, String queueId) {
- MessageDaoImpl messageDao = new MessageDaoImpl();
- messageDao.setDataSource(DbUtil.getDataSource());
+ public static List<MessageQueueDataItem> readResult(String queueType, String queueId) {
+ MessageDaoImpl messageDao = new MessageDaoImpl();
+ messageDao.setDataSource(DbUtil.getDataSource());
- Queue q = new Queue();
- q.type = queueType;
- q.id = queueId;
+ Queue q = new Queue(queueType, queueId);
- List<Message> messageList = messageDao.readMessageQueue(q);
+ List<Message> messageList = messageDao.readMessageQueue(q);
- List<MessageQueueDataItem> ll = new ArrayList<>();
- if (messageList != null) {
- for (Message m : messageList) {
- if (m.statusHistory != null) {
- for (MessageStatus s : m.statusHistory) {
- MessageQueueDataItem item = new MessageQueueDataItem();
- item.extMessageId = m.extMessageId;
- item.status = s;
- item.timeStamp = s.timestamp;
- ll.add(item);
- }
- }
- if (m.actionHistory != null) {
- for (MessageAction a : m.actionHistory) {
- MessageQueueDataItem item = new MessageQueueDataItem();
- item.extMessageId = m.extMessageId;
- item.action = a;
- item.timeStamp = a.timestamp;
- ll.add(item);
- }
- }
- }
- }
- Collections.sort(ll);
- return ll;
- }
+ List<MessageQueueDataItem> ll = new ArrayList<>();
+ if (messageList != null) {
+ for (Message m : messageList) {
+ if (m.getStatusHistory() != null) {
+ for (MessageStatus s : m.getStatusHistory()) {
+ MessageQueueDataItem item = new MessageQueueDataItem();
+ item.extMessageId = m.getExtMessageId();
+ item.status = s;
+ item.timeStamp = s.getTimestamp();
+ ll.add(item);
+ }
+ }
+ if (m.getActionHistory() != null) {
+ for (MessageAction a : m.getActionHistory()) {
+ MessageQueueDataItem item = new MessageQueueDataItem();
+ item.extMessageId = m.getExtMessageId();
+ item.action = a;
+ item.timeStamp = a.getTimestamp();
+ ll.add(item);
+ }
+ }
+ }
+ }
+ Collections.sort(ll);
+ return ll;
+ }
- @SuppressWarnings("unchecked")
- public static void checkResult(Map<String, Object> testResult) {
- List<Object> resultSetupList = (List<Object>) testResult.get("queue_list");
- if (resultSetupList != null) {
- for (Object o : resultSetupList) {
- Map<String, Object> resultSetup = (Map<String, Object>) o;
- String queueType = (String) resultSetup.get("queue_type");
- String queueId = (String) resultSetup.get("queue_id");
- log.info(queueType + "::" + queueId);
- List<MessageQueueDataItem> itemList = MessageQueueTestResult.readResult(queueType, queueId);
- for (MessageQueueDataItem item : itemList) {
- log.info(" --- " + item);
- }
+ @SuppressWarnings("unchecked")
+ public static void checkResult(Map<String, Object> testResult) {
+ List<Object> resultSetupList = (List<Object>) testResult.get("queue_list");
+ if (resultSetupList != null) {
+ for (Object o : resultSetupList) {
+ Map<String, Object> resultSetup = (Map<String, Object>) o;
+ String queueType = (String) resultSetup.get("queue_type");
+ String queueId = (String) resultSetup.get("queue_id");
+ log.info(queueType + "::" + queueId);
+ List<MessageQueueDataItem> itemList = MessageQueueTestResult.readResult(queueType, queueId);
+ for (MessageQueueDataItem item : itemList) {
+ log.info(" --- " + item);
+ }
- List<Object> checkSequenceList = (List<Object>) resultSetup.get("check_sequence_list");
- for (int i = 0; i < checkSequenceList.size(); i++) {
- List<Object> checkSequence = (List<Object>) checkSequenceList.get(i);
- List<MessageQueueDataItem> checkItemList = new ArrayList<>();
- for (Object o2 : checkSequence) {
- String[] ss = ((String) o2).split("\\|");
- MessageQueueDataItem item = new MessageQueueDataItem();
- item.extMessageId = ss[1].trim();
- if (ss[0].trim().equals("STATUS")) {
- item.status = new MessageStatus();
- item.status.status = MessageStatusValue.valueOf(ss[2].trim());
- } else {
- item.action = new MessageAction();
- item.action.action = MessageActionValue.valueOf(ss[2].trim());
- if (item.action.action == MessageActionValue.HOLD || item.action.action == MessageActionValue.RETURN_HOLD) {
- item.action.holdTime = Integer.parseInt(ss[3].trim());
- } else if (item.action.action == MessageActionValue.RETURN_COMPLETE) {
- item.action.resolution = ss[3].trim();
- }
- }
- checkItemList.add(item);
- }
- List<MessageQueueDataItem> itemList1 = new ArrayList<>(itemList);
- itemList1.retainAll(checkItemList);
- if (!itemList1.equals(checkItemList)) {
- log.info("Expected sequence #" + i + " not found");
- log.info("Expected sequence:");
- for (MessageQueueDataItem item : checkItemList) {
- log.info(" --- " + item);
- }
- log.info("Found sequence:");
- for (MessageQueueDataItem item : itemList1) {
- log.info(" --- " + item);
- }
- fail("Expected sequence #" + i + " not found");
- } else {
- log.info("Expected sequence #" + i + " found in the result:");
- for (MessageQueueDataItem item : checkItemList) {
- log.info(" --- " + item);
- }
- }
- }
- }
- }
- }
+ List<Object> checkSequenceList = (List<Object>) resultSetup.get("check_sequence_list");
+ for (int i = 0; i < checkSequenceList.size(); i++) {
+ List<Object> checkSequence = (List<Object>) checkSequenceList.get(i);
+ List<MessageQueueDataItem> checkItemList = new ArrayList<>();
+ for (Object o2 : checkSequence) {
+ String[] ss = ((String) o2).split("\\|");
+ MessageQueueDataItem item = new MessageQueueDataItem();
+ item.extMessageId = ss[1].trim();
+ if (ss[0].trim().equals("STATUS")) {
+ item.status = new MessageStatus(MessageStatusValue.valueOf(ss[2].trim()));
+ } else {
+ MessageActionValue action = MessageActionValue.valueOf(ss[2].trim());
+ int holdTime = 0;
+ String resolution = null;
+ if (action == MessageActionValue.HOLD || action == MessageActionValue.RETURN_HOLD) {
+ holdTime = Integer.parseInt(ss[3].trim());
+ } else if (action == MessageActionValue.RETURN_COMPLETE) {
+ resolution = ss[3].trim();
+ }
+ item.action = new MessageAction(action, resolution, holdTime, null);
+ }
+ checkItemList.add(item);
+ }
+ List<MessageQueueDataItem> itemList1 = new ArrayList<>(itemList);
+ itemList1.retainAll(checkItemList);
+ if (!itemList1.equals(checkItemList)) {
+ log.info("Expected sequence #" + i + " not found");
+ log.info("Expected sequence:");
+ for (MessageQueueDataItem item : checkItemList) {
+ log.info(" --- " + item);
+ }
+ log.info("Found sequence:");
+ for (MessageQueueDataItem item : itemList1) {
+ log.info(" --- " + item);
+ }
+ fail("Expected sequence #" + i + " not found");
+ } else {
+ log.info("Expected sequence #" + i + " found in the result:");
+ for (MessageQueueDataItem item : checkItemList) {
+ log.info(" --- " + item);
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Classifier.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Classifier.java
deleted file mode 100644
index 254fd4ce4..000000000
--- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Classifier.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package org.onap.ccsdk.features.lib.doorman.it.tc1;
-
-import org.onap.ccsdk.features.lib.doorman.MessageClassifier;
-import org.onap.ccsdk.features.lib.doorman.data.MessageData;
-import org.onap.ccsdk.features.lib.doorman.data.Queue;
-
-public class Classifier implements MessageClassifier {
-
- @Override
- public Queue determineQueue(MessageData request) {
- Queue q = new Queue();
- q.type = "Cluster";
- q.id = "test-queue";
- return q;
- }
-
- @Override
- public String getExtMessageId(MessageData request) {
- return (String) request.param.get("request_id");
- }
-}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Handler.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Handler.java
index f57fdbc36..8ee7b6a89 100644
--- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Handler.java
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Handler.java
@@ -4,7 +4,7 @@ import org.onap.ccsdk.features.lib.doorman.impl.MessageHandlerBaseImpl;
public class Handler extends MessageHandlerBaseImpl {
- public Handler() {
+ public Handler() {
- }
+ }
}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Classifier.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Classifier.java
deleted file mode 100644
index a32f74650..000000000
--- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Classifier.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package org.onap.ccsdk.features.lib.doorman.it.tc2;
-
-import org.onap.ccsdk.features.lib.doorman.MessageClassifier;
-import org.onap.ccsdk.features.lib.doorman.data.MessageData;
-import org.onap.ccsdk.features.lib.doorman.data.Queue;
-
-public class Classifier implements MessageClassifier {
-
- @Override
- public Queue determineQueue(MessageData request) {
- Queue q = new Queue();
- q.type = "Cluster";
- q.id = "test-queue";
- return q;
- }
-
- @Override
- public String getExtMessageId(MessageData request) {
- return (String) request.param.get("request_id");
- }
-}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Handler.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Handler.java
index 35b811c5f..0be64372d 100644
--- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Handler.java
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Handler.java
@@ -9,45 +9,44 @@ import org.onap.ccsdk.features.lib.doorman.util.JsonUtil;
public class Handler extends MessageHandlerBaseImpl {
- public Handler() {
- setMaxParallelCount(100);
- setUpdateWaitTime(20);
- }
+ public Handler() {
+ setMaxParallelCount(100);
+ setUpdateWaitTime(20);
+ }
- @SuppressWarnings("unchecked")
- @Override
- protected String determineUpdateGroup(Message msg) {
- if (msg.request.body != null) {
- Map<String, Object> body = (Map<String, Object>) JsonUtil.jsonToData(msg.request.body);
- String op = (String) body.get("operation");
- String entityId = (String) body.get("entity_id");
- if ("update".equals(op)) {
- return entityId;
- }
- }
- return super.determineUpdateGroup(msg);
- }
+ @SuppressWarnings("unchecked")
+ @Override
+ protected String determineUpdateGroup(Message msg) {
+ if (msg.getRequest().getBody() != null) {
+ Map<String, Object> body = (Map<String, Object>) JsonUtil.jsonToData(msg.getRequest().getBody());
+ String op = (String) body.get("operation");
+ String entityId = (String) body.get("entity_id");
+ if ("update".equals(op)) {
+ return entityId;
+ }
+ }
+ return super.determineUpdateGroup(msg);
+ }
- @Override
- protected long determineUpdateSequence(Message msg) {
- if (msg.request.param != null) {
- Long n = (Long) msg.request.param.get("sequence_number");
- if (n != null) {
- return n;
- }
- }
- return super.determineUpdateSequence(msg);
- }
+ @Override
+ protected long determineUpdateSequence(Message msg) {
+ if (msg.getRequest().getParam() != null) {
+ Long n = (Long) msg.getRequest().getParam().get("sequence_number");
+ if (n != null) {
+ return n;
+ }
+ }
+ return super.determineUpdateSequence(msg);
+ }
- @Override
- protected MessageData completeResponse(Resolution r, Message msg) {
- if (r == Resolution.SKIPPED) {
- MessageData response = new MessageData();
- response.param = new HashMap<>();
- response.param.put("http_code", 200L);
- response.body = "{ \"status\": \"success\" }";
- return response;
- }
- return super.completeResponse(r, msg);
- }
+ @Override
+ protected MessageData completeResponse(Resolution r, Message msg) {
+ if (r == Resolution.SKIPPED) {
+ Map<String, Object> param = new HashMap<>();
+ param.put("http_code", 200L);
+ String body = "{ \"status\": \"success\" }";
+ return new MessageData(param, body);
+ }
+ return super.completeResponse(r, msg);
+ }
}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/DbUtil.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/DbUtil.java
index 1e525b4b2..b6ce6ca03 100644
--- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/DbUtil.java
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/DbUtil.java
@@ -6,87 +6,83 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
-
import javax.sql.DataSource;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DbUtil {
- private static final Logger log = LoggerFactory.getLogger(DbUtil.class);
-
- private static DataSource dataSource = null;
-
- public static synchronized DataSource getDataSource() {
- if (dataSource == null) {
- String url = "jdbc:h2:mem:app;DB_CLOSE_DELAY=-1";
-
- dataSource = new DataSource() {
-
- @Override
- public <T> T unwrap(Class<T> arg0) throws SQLException {
- return null;
- }
-
- @Override
- public boolean isWrapperFor(Class<?> arg0) throws SQLException {
- return false;
- }
-
- @Override
- public void setLoginTimeout(int arg0) throws SQLException {
- }
-
- @Override
- public void setLogWriter(PrintWriter arg0) throws SQLException {
- }
-
- @Override
- public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
- return null;
- }
-
- @Override
- public int getLoginTimeout() throws SQLException {
- return 0;
- }
-
- @Override
- public PrintWriter getLogWriter() throws SQLException {
- return null;
- }
-
- @Override
- public Connection getConnection(String username, String password) throws SQLException {
- return null;
- }
-
- @Override
- public Connection getConnection() throws SQLException {
- return DriverManager.getConnection(url);
- }
- };
-
- try {
- String script = FileUtil.read("/schema.sql");
-
- String[] sqlList = script.split(";");
- try (Connection con = dataSource.getConnection()) {
- for (String sql : sqlList) {
- if (!sql.trim().isEmpty()) {
- sql = sql.trim();
- try (PreparedStatement ps = con.prepareStatement(sql)) {
- log.info("Executing statement:\n" + sql);
- ps.execute();
- }
- }
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- return dataSource;
- }
+ private static final Logger log = LoggerFactory.getLogger(DbUtil.class);
+
+ private static DataSource dataSource = null;
+
+ public static synchronized DataSource getDataSource() {
+ if (dataSource == null) {
+ String url = "jdbc:h2:mem:app;DB_CLOSE_DELAY=-1";
+
+ dataSource = new DataSource() {
+
+ @Override
+ public <T> T unwrap(Class<T> arg0) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> arg0) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setLoginTimeout(int arg0) throws SQLException {}
+
+ @Override
+ public void setLogWriter(PrintWriter arg0) throws SQLException {}
+
+ @Override
+ public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
+ return null;
+ }
+
+ @Override
+ public int getLoginTimeout() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public PrintWriter getLogWriter() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Connection getConnection(String username, String password) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ return DriverManager.getConnection(url);
+ }
+ };
+
+ try {
+ String script = FileUtil.read("/schema.sql");
+
+ String[] sqlList = script.split(";");
+ try (Connection con = dataSource.getConnection()) {
+ for (String sql : sqlList) {
+ if (!sql.trim().isEmpty()) {
+ sql = sql.trim();
+ try (PreparedStatement ps = con.prepareStatement(sql)) {
+ log.info("Executing statement:\n" + sql);
+ ps.execute();
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return dataSource;
+ }
}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/FileUtil.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/FileUtil.java
index 5ae92b4fc..4958fe376 100644
--- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/FileUtil.java
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/FileUtil.java
@@ -6,19 +6,19 @@ import java.io.InputStreamReader;
public class FileUtil {
- public static String read(String fileName) throws Exception {
- String ss = "";
- try (InputStream is = DbUtil.class.getResourceAsStream(fileName)) {
- try (InputStreamReader isr = new InputStreamReader(is)) {
- try (BufferedReader in = new BufferedReader(isr)) {
- String s = in.readLine();
- while (s != null) {
- ss += s + '\n';
- s = in.readLine();
- }
- }
- }
- }
- return ss;
- }
+ public static String read(String fileName) throws Exception {
+ String ss = "";
+ try (InputStream is = DbUtil.class.getResourceAsStream(fileName)) {
+ try (InputStreamReader isr = new InputStreamReader(is)) {
+ try (BufferedReader in = new BufferedReader(isr)) {
+ String s = in.readLine();
+ while (s != null) {
+ ss += s + '\n';
+ s = in.readLine();
+ }
+ }
+ }
+ }
+ return ss;
+ }
}
diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/util/TestJsonUtil.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/util/TestJsonUtil.java
index f0ac87e24..7d4bd9456 100644
--- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/util/TestJsonUtil.java
+++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/util/TestJsonUtil.java
@@ -1,7 +1,6 @@
package org.onap.ccsdk.features.lib.doorman.util;
import java.util.Map;
-
import org.junit.Assert;
import org.junit.Test;
import org.onap.ccsdk.features.lib.doorman.testutil.FileUtil;
@@ -11,16 +10,16 @@ import org.slf4j.LoggerFactory;
public class TestJsonUtil {
- private static final Logger log = LoggerFactory.getLogger(TestJsonUtil.class);
+ private static final Logger log = LoggerFactory.getLogger(TestJsonUtil.class);
- @SuppressWarnings("unchecked")
- @Test
- public void testJsonConversion() throws Exception {
- String json1 = FileUtil.read("/test1.json");
- Map<String, Object> data1 = (Map<String, Object>) JsonUtil.jsonToData(json1);
- String convJson1 = JsonUtil.dataToJson(data1);
- log.info("Converted JSON:\n" + convJson1);
- Map<String, Object> convData1 = (Map<String, Object>) JsonUtil.jsonToData(convJson1);
- Assert.assertEquals(data1, convData1);
- }
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testJsonConversion() throws Exception {
+ String json1 = FileUtil.read("/test1.json");
+ Map<String, Object> data1 = (Map<String, Object>) JsonUtil.jsonToData(json1);
+ String convJson1 = JsonUtil.dataToJson(data1);
+ log.info("Converted JSON:\n" + convJson1);
+ Map<String, Object> convData1 = (Map<String, Object>) JsonUtil.jsonToData(convJson1);
+ Assert.assertEquals(data1, convData1);
+ }
}