diff options
Diffstat (limited to 'lib/doorman')
33 files changed, 2549 insertions, 2391 deletions
diff --git a/lib/doorman/pom.xml b/lib/doorman/pom.xml index e7883a944..1e902d6aa 100644 --- a/lib/doorman/pom.xml +++ b/lib/doorman/pom.xml @@ -1,36 +1,35 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <parent> + <parent> <groupId>org.onap.ccsdk.features</groupId> <artifactId>ccsdk-features</artifactId> <version>1.0.0-SNAPSHOT</version> - </parent> + <relativePath>../..</relativePath> + </parent> <groupId>org.onap.ccsdk.features.lib.doorman</groupId> <artifactId>doorman</artifactId> - <description>Doorman - Request prioritization and agregation queue</description> + <description>Doorman - Request prioritization and aggregation queue</description> <dependencies> <dependency> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> - <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> - <scope>provided</scope> </dependency> <dependency> <groupId>org.onap.ccsdk.features.lib.rlock</groupId> <artifactId>rlock</artifactId> <version>1.0.0-SNAPSHOT</version> - <scope>provided</scope> </dependency> <dependency> diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java index b6af1731e..e31553c4d 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java @@ -5,7 +5,7 @@ import org.onap.ccsdk.features.lib.doorman.data.Queue; public interface MessageClassifier { - Queue determineQueue(MessageData request); + Queue determineQueue(MessageData request); - String getExtMessageId(MessageData request); + String getExtMessageId(MessageData request); } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java index 1ff811baa..36dacc64e 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java @@ -4,7 +4,7 @@ import org.onap.ccsdk.features.lib.doorman.data.MessageData; public interface MessageInterceptor { - MessageData processRequest(MessageData request); + MessageData processRequest(MessageData request); - void processResponse(MessageData response); + void processResponse(MessageData response); } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java index 4c9886425..124f2f471 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java @@ -2,5 +2,5 @@ package org.onap.ccsdk.features.lib.doorman; public interface MessageInterceptorFactory { - MessageInterceptor create(); + MessageInterceptor create(); } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java index 0bb05a34c..7797ee095 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java @@ -4,5 +4,5 @@ import org.onap.ccsdk.features.lib.doorman.data.MessageData; public interface MessageProcessor { - void processMessage(MessageData request); + void processMessage(MessageData request); } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java index 4285393a4..55d7e0de9 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java @@ -8,5 +8,5 @@ import org.onap.ccsdk.features.lib.doorman.data.MessageAction; public interface MessageQueueHandler { - Map<Long, MessageAction> nextAction(Event event, Message msg, List<Message> queue); + Map<Long, MessageAction> nextAction(Event event, Message msg, List<Message> queue); } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java index 764faa9c8..552635c57 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java @@ -10,21 +10,21 @@ import org.onap.ccsdk.features.lib.doorman.data.Queue; public interface MessageDao { - long addArrivedMessage(String extMessageId, MessageData request, Queue queue, Date timestamp); + long addArrivedMessage(String extMessageId, MessageData request, Queue queue, Date timestamp); - void updateMessageStarted(long messageId, Date timestamp); + void updateMessageStarted(long messageId, Date timestamp); - void updateMessageCompleted(long messageId, String resolution, Date timestamp); + void updateMessageCompleted(long messageId, String resolution, Date timestamp); - void updateMessageResponse(long messageId, Date timestamp, MessageData response); + void updateMessageResponse(long messageId, Date timestamp, MessageData response); - void addStatus(long messageId, MessageStatus status); + void addStatus(long messageId, MessageStatus status); - void addAction(long messageId, MessageAction action); + void addAction(long messageId, MessageAction action); - void updateActionDone(long actionId, Date now); + void updateActionDone(long actionId, Date now); - List<Message> readMessageQueue(Queue queue); + List<Message> readMessageQueue(Queue queue); - MessageAction getNextAction(long messageId); + MessageAction getNextAction(long messageId); } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java index b3ecf3d4b..f04ea6259 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java @@ -24,284 +24,313 @@ import org.onap.ccsdk.features.lib.doorman.util.JsonUtil; public class MessageDaoImpl implements MessageDao { - private DataSource dataSource; + private DataSource dataSource; - @Override - public long addArrivedMessage(String extMessageId, MessageData request, Queue queue, Date timestamp) { - try (Connection con = dataSource.getConnection()) { - try { - con.setAutoCommit(false); - long id = 0; - String sql = "INSERT INTO message (ext_message_id, request_param, request_body, arrived_timestamp, queue_type, queue_id) VALUES (?, ?, ?, ?, ?, ?)"; - try (PreparedStatement ps = con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { - ps.setString(1, extMessageId); - ps.setString(2, JsonUtil.dataToJson(request.param)); - ps.setString(3, request.body); - ps.setTimestamp(4, new Timestamp(timestamp.getTime())); - if (queue != null) { - ps.setString(5, queue.type); - ps.setString(6, queue.id); - } else { - ps.setNull(5, Types.VARCHAR); - ps.setNull(6, Types.VARCHAR); - } - ps.executeUpdate(); - try (ResultSet rs = ps.getGeneratedKeys()) { - rs.next(); - id = rs.getLong(1); - } - } - con.commit(); - return id; - } catch (SQLException ex) { - con.rollback(); - throw ex; - } - } catch (SQLException e) { - throw new RuntimeException("Error inserting message to DB: " + e.getMessage(), e); - } - } + @Override + public long addArrivedMessage(String extMessageId, MessageData request, Queue queue, Date timestamp) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + long id = 0; + String sql = "INSERT INTO message (\n" + + " ext_message_id, request_param, request_body, arrived_timestamp, queue_type, queue_id)\n" + + "VALUES (?, ?, ?, ?, ?, ?)"; + try (PreparedStatement ps = con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { + ps.setString(1, extMessageId); + ps.setString(2, JsonUtil.dataToJson(request.getParam())); + ps.setString(3, request.getBody()); + ps.setTimestamp(4, new Timestamp(timestamp.getTime())); + if (queue != null) { + ps.setString(5, queue.getType()); + ps.setString(6, queue.getId()); + } else { + ps.setNull(5, Types.VARCHAR); + ps.setNull(6, Types.VARCHAR); + } + ps.executeUpdate(); + try (ResultSet rs = ps.getGeneratedKeys()) { + rs.next(); + id = rs.getLong(1); + } + } + con.commit(); + return id; + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error inserting message to DB: " + e.getMessage(), e); + } + } - @Override - public void updateMessageStarted(long messageId, Date timestamp) { - updateMessageStatus("started_timestamp", messageId, null, timestamp); - } + @Override + public void updateMessageStarted(long messageId, Date timestamp) { + updateMessageStatus("started_timestamp", messageId, null, timestamp); + } - @Override - public void updateMessageCompleted(long messageId, String resolution, Date timestamp) { - updateMessageStatus("completed_timestamp", messageId, resolution, timestamp); - } + @Override + public void updateMessageCompleted(long messageId, String resolution, Date timestamp) { + updateMessageStatus("completed_timestamp", messageId, resolution, timestamp); + } - private void updateMessageStatus(String timestampColumn, long messageId, String resolution, Date timestamp) { - try (Connection con = dataSource.getConnection()) { - try { - con.setAutoCommit(false); - String sql = "UPDATE message SET " + timestampColumn + " = ? WHERE message_id = ?"; - try (PreparedStatement ps = con.prepareStatement(sql)) { - ps.setTimestamp(1, new Timestamp(timestamp.getTime())); - ps.setLong(2, messageId); - ps.executeUpdate(); - } - con.commit(); - } catch (SQLException ex) { - con.rollback(); - throw ex; - } - } catch (SQLException e) { - throw new RuntimeException("Error updating message status in DB: " + e.getMessage(), e); - } - } + private void updateMessageStatus(String timestampColumn, long messageId, String resolution, Date timestamp) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + String sql = "UPDATE message SET " + timestampColumn + " = ? WHERE message_id = ?"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setTimestamp(1, new Timestamp(timestamp.getTime())); + ps.setLong(2, messageId); + ps.executeUpdate(); + } + con.commit(); + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error updating message status in DB: " + e.getMessage(), e); + } + } - @Override - public void updateMessageResponse(long messageId, Date timestamp, MessageData response) { - try (Connection con = dataSource.getConnection()) { - try { - con.setAutoCommit(false); - String sql = "UPDATE message SET response_timestamp = ?, response_param = ?, response_body = ? WHERE message_id = ?"; - try (PreparedStatement ps = con.prepareStatement(sql)) { - ps.setTimestamp(1, new Timestamp(timestamp.getTime())); - ps.setString(2, JsonUtil.dataToJson(response.param)); - ps.setString(3, response.body); - ps.setLong(4, messageId); - ps.executeUpdate(); - } - con.commit(); - } catch (SQLException ex) { - con.rollback(); - throw ex; - } - } catch (SQLException e) { - throw new RuntimeException("Error updating message response in DB: " + e.getMessage(), e); - } - } + @Override + public void updateMessageResponse(long messageId, Date timestamp, MessageData response) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + String sql = "UPDATE message SET response_timestamp = ?, response_param = ?, response_body = ?\n" + + "WHERE message_id = ?"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setTimestamp(1, new Timestamp(timestamp.getTime())); + ps.setString(2, JsonUtil.dataToJson(response.getParam())); + ps.setString(3, response.getBody()); + ps.setLong(4, messageId); + ps.executeUpdate(); + } + con.commit(); + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error updating message response in DB: " + e.getMessage(), e); + } + } - @Override - public void addStatus(long messageId, MessageStatus status) { - try (Connection con = dataSource.getConnection()) { - try { - con.setAutoCommit(false); - String sql = "INSERT INTO message_status (message_id, status, status_timestamp) VALUES (?, ?, ?)"; - try (PreparedStatement ps = con.prepareStatement(sql)) { - ps.setLong(1, messageId); - ps.setString(2, status.status.toString()); - ps.setTimestamp(3, new Timestamp(status.timestamp.getTime())); - ps.executeUpdate(); - } - con.commit(); - } catch (SQLException ex) { - con.rollback(); - throw ex; - } - } catch (SQLException e) { - throw new RuntimeException("Error inserting message status to DB: " + e.getMessage(), e); - } - } + @Override + public void addStatus(long messageId, MessageStatus status) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + String sql = "INSERT INTO message_status (message_id, status, status_timestamp) VALUES (?, ?, ?)"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setLong(1, messageId); + ps.setString(2, status.getStatus().toString()); + ps.setTimestamp(3, new Timestamp(status.getTimestamp().getTime())); + ps.executeUpdate(); + } + con.commit(); + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error inserting message status to DB: " + e.getMessage(), e); + } + } - @Override - public void addAction(long messageId, MessageAction action) { - try (Connection con = dataSource.getConnection()) { - try { - con.setAutoCommit(false); - String sql = "INSERT INTO message_action (message_id, action, action_status, resolution, action_timestamp, done_timestamp, hold_time, response_param, response_body) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; - try (PreparedStatement ps = con.prepareStatement(sql)) { - ps.setLong(1, messageId); - ps.setString(2, action.action.toString()); - ps.setString(3, action.actionStatus.toString()); - ps.setString(4, action.resolution); - ps.setTimestamp(5, new Timestamp(action.timestamp.getTime())); - if (action.doneTimestamp != null) { - ps.setTimestamp(6, new Timestamp(action.doneTimestamp.getTime())); - } else { - ps.setNull(6, Types.TIMESTAMP); - } - ps.setInt(7, action.holdTime); - if (action.returnResponse != null) { - ps.setString(8, JsonUtil.dataToJson(action.returnResponse.param)); - ps.setString(9, action.returnResponse.body); - } else { - ps.setNull(8, Types.VARCHAR); - ps.setNull(9, Types.VARCHAR); - } - ps.executeUpdate(); - } - con.commit(); - } catch (SQLException ex) { - con.rollback(); - throw ex; - } - } catch (SQLException e) { - throw new RuntimeException("Error inserting message action to DB: " + e.getMessage(), e); - } - } + @Override + public void addAction(long messageId, MessageAction action) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + String sql = "INSERT INTO message_action (\n" + + " message_id, action, action_status, resolution, action_timestamp,\n" + + " done_timestamp, hold_time, response_param, response_body)\n" + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setLong(1, messageId); + ps.setString(2, action.getAction().toString()); + ps.setString(3, action.getActionStatus().toString()); + ps.setString(4, action.getResolution()); + ps.setTimestamp(5, new Timestamp(action.getTimestamp().getTime())); + if (action.getDoneTimestamp() != null) { + ps.setTimestamp(6, new Timestamp(action.getDoneTimestamp().getTime())); + } else { + ps.setNull(6, Types.TIMESTAMP); + } + ps.setInt(7, action.getHoldTime()); + if (action.getReturnResponse() != null) { + ps.setString(8, JsonUtil.dataToJson(action.getReturnResponse().getParam())); + ps.setString(9, action.getReturnResponse().getBody()); + } else { + ps.setNull(8, Types.VARCHAR); + ps.setNull(9, Types.VARCHAR); + } + ps.executeUpdate(); + } + con.commit(); + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error inserting message action to DB: " + e.getMessage(), e); + } + } - @Override - public void updateActionDone(long actionId, Date now) { - try (Connection con = dataSource.getConnection()) { - try { - con.setAutoCommit(false); - String sql = "UPDATE message_action SET action_status = ?, done_timestamp = ? WHERE message_action_id = ?"; - try (PreparedStatement ps = con.prepareStatement(sql)) { - ps.setString(1, ActionStatus.DONE.toString()); - ps.setTimestamp(2, new Timestamp(now.getTime())); - ps.setLong(3, actionId); - ps.executeUpdate(); - } - con.commit(); - } catch (SQLException ex) { - con.rollback(); - throw ex; - } - } catch (SQLException e) { - throw new RuntimeException("Error updating action in DB: " + e.getMessage(), e); - } - } + @Override + public void updateActionDone(long actionId, Date now) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + String sql = + "UPDATE message_action SET action_status = ?, done_timestamp = ? WHERE message_action_id = ?"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setString(1, ActionStatus.DONE.toString()); + ps.setTimestamp(2, new Timestamp(now.getTime())); + ps.setLong(3, actionId); + ps.executeUpdate(); + } + con.commit(); + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error updating action in DB: " + e.getMessage(), e); + } + } - @SuppressWarnings("unchecked") - @Override - public List<Message> readMessageQueue(Queue queue) { - List<Message> messageList = new ArrayList<>(); - try (Connection con = dataSource.getConnection()) { - String msql = "SELECT * FROM message WHERE queue_type = ? AND queue_id = ?"; - String ssql = "SELECT * FROM message_status WHERE message_id = ? ORDER BY message_status_id DESC"; - String asql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY message_action_id DESC"; - try (PreparedStatement mps = con.prepareStatement(msql); PreparedStatement sps = con.prepareStatement(ssql); PreparedStatement aps = con.prepareStatement(asql)) { - mps.setString(1, queue.type); - mps.setString(2, queue.id); - try (ResultSet mrs = mps.executeQuery()) { - while (mrs.next()) { - Message m = new Message(); - m.messageId = mrs.getLong("message_id"); - m.extMessageId = mrs.getString("ext_message_id"); - m.request = new MessageData(); - m.request.param = (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("request_param")); - m.request.body = mrs.getString("request_body"); - m.response = new MessageData(); - m.response.param = (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("response_param")); - m.response.body = mrs.getString("response_body"); - m.queue = new Queue(); - m.queue.type = mrs.getString("queue_type"); - m.queue.id = mrs.getString("queue_id"); - m.arrivedTimestamp = mrs.getTimestamp("arrived_timestamp"); - m.startedTimestamp = mrs.getTimestamp("started_timestamp"); - m.completedTimestamp = mrs.getTimestamp("completed_timestamp"); - m.responseTimestamp = mrs.getTimestamp("response_timestamp"); - m.statusHistory = new ArrayList<>(); - m.actionHistory = new ArrayList<>(); - messageList.add(m); + @SuppressWarnings("unchecked") + @Override + public List<Message> readMessageQueue(Queue queue) { + List<Message> messageList = new ArrayList<>(); + try (Connection con = dataSource.getConnection()) { + String msql = "SELECT * FROM message WHERE queue_type = ? AND queue_id = ?"; + String ssql = "SELECT * FROM message_status WHERE message_id = ? ORDER BY message_status_id DESC"; + String asql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY message_action_id DESC"; + try (PreparedStatement mps = con.prepareStatement(msql); + PreparedStatement sps = con.prepareStatement(ssql); + PreparedStatement aps = con.prepareStatement(asql)) { + mps.setString(1, queue.getType()); + mps.setString(2, queue.getId()); + try (ResultSet mrs = mps.executeQuery()) { + while (mrs.next()) { + long messageId = mrs.getLong("message_id"); + String extMessageId = mrs.getString("ext_message_id"); - sps.setLong(1, m.messageId); - try (ResultSet srs = sps.executeQuery()) { - while (srs.next()) { - MessageStatus s = new MessageStatus(); - s.status = MessageStatusValue.valueOf(srs.getString("status")); - s.timestamp = srs.getTimestamp("status_timestamp"); - m.statusHistory.add(s); - } - } + Map<String, Object> requestParam = + (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("request_param")); + String requestBody = mrs.getString("request_body"); + MessageData request = null; + if (requestParam != null || requestBody != null) { + request = new MessageData(requestParam, requestBody); + } - aps.setLong(1, m.messageId); - try (ResultSet ars = aps.executeQuery()) { - while (ars.next()) { - MessageAction a = new MessageAction(); - a.actionId = ars.getLong("message_action_id"); - a.action = MessageActionValue.valueOf(ars.getString("action")); - a.actionStatus = ActionStatus.valueOf(ars.getString("action_status")); - a.timestamp = ars.getTimestamp("action_timestamp"); - a.doneTimestamp = ars.getTimestamp("done_timestamp"); - a.holdTime = ars.getInt("hold_time"); - a.returnResponse = new MessageData(); - a.returnResponse.param = (Map<String, Object>) JsonUtil.jsonToData(ars.getString("response_param")); - a.returnResponse.body = ars.getString("response_body"); - if (a.returnResponse.param == null && a.returnResponse.body == null) { - a.returnResponse = null; - } - a.resolution = ars.getString("resolution"); - m.actionHistory.add(a); - } - } - } - } - } - } catch (SQLException e) { - throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e); - } - return messageList; - } + Map<String, Object> responseParam = + (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("response_param")); + String responseBody = mrs.getString("response_body"); + MessageData response = null; + if (responseParam != null || responseBody != null) { + response = new MessageData(responseParam, responseBody); + } - @SuppressWarnings("unchecked") - @Override - public MessageAction getNextAction(long messageId) { - try (Connection con = dataSource.getConnection()) { - String sql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY action_timestamp DESC"; - try (PreparedStatement ps = con.prepareStatement(sql)) { - ps.setLong(1, messageId); - try (ResultSet rs = ps.executeQuery()) { - if (rs.next()) { - MessageAction a = new MessageAction(); - a.actionId = rs.getLong("message_action_id"); - a.action = MessageActionValue.valueOf(rs.getString("action")); - a.actionStatus = ActionStatus.valueOf(rs.getString("action_status")); - a.timestamp = rs.getTimestamp("action_timestamp"); - a.doneTimestamp = rs.getTimestamp("done_timestamp"); - a.holdTime = rs.getInt("hold_time"); - a.returnResponse = new MessageData(); - a.returnResponse.param = (Map<String, Object>) JsonUtil.jsonToData(rs.getString("response_param")); - a.returnResponse.body = rs.getString("response_body"); - if (a.returnResponse.param == null && a.returnResponse.body == null) { - a.returnResponse = null; - } - a.resolution = rs.getString("resolution"); - return a; - } - return null; - } - } - } catch (SQLException e) { - throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e); - } - } + Date arrivedTimestamp = mrs.getTimestamp("arrived_timestamp"); + Date startedTimestamp = mrs.getTimestamp("started_timestamp"); + Date completedTimestamp = mrs.getTimestamp("completed_timestamp"); + Date responseTimestamp = mrs.getTimestamp("response_timestamp"); - public void setDataSource(DataSource dataSource) { - this.dataSource = dataSource; - } + List<MessageStatus> statusHistory = new ArrayList<>(); + sps.setLong(1, messageId); + try (ResultSet srs = sps.executeQuery()) { + while (srs.next()) { + MessageStatusValue status = MessageStatusValue.valueOf(srs.getString("status")); + Date timestamp = srs.getTimestamp("status_timestamp"); + statusHistory.add(new MessageStatus(status, timestamp)); + } + } + + List<MessageAction> actionHistory = new ArrayList<>(); + aps.setLong(1, messageId); + try (ResultSet ars = aps.executeQuery()) { + while (ars.next()) { + long actionId = ars.getLong("message_action_id"); + MessageActionValue action = MessageActionValue.valueOf(ars.getString("action")); + ActionStatus actionStatus = ActionStatus.valueOf(ars.getString("action_status")); + String resolution = ars.getString("resolution"); + Date timestamp = ars.getTimestamp("action_timestamp"); + Date doneTimestamp = ars.getTimestamp("done_timestamp"); + Integer holdTimeO = ars.getInt("hold_time"); + int holdTime = holdTimeO != null ? holdTimeO : 0; + + Map<String, Object> returnResponseParam = + (Map<String, Object>) JsonUtil.jsonToData(ars.getString("response_param")); + String returnResponseBody = ars.getString("response_body"); + MessageData returnResponse = null; + if (returnResponseParam != null || returnResponseBody != null) { + returnResponse = new MessageData(returnResponseParam, returnResponseBody); + } + + MessageAction a = new MessageAction(actionId, action, actionStatus, resolution, + timestamp, doneTimestamp, holdTime, returnResponse); + actionHistory.add(a); + } + } + + Message m = new Message(messageId, extMessageId, request, response, arrivedTimestamp, + startedTimestamp, completedTimestamp, responseTimestamp, queue, statusHistory, + actionHistory); + messageList.add(m); + } + } + } + } catch (SQLException e) { + throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e); + } + return messageList; + } + + @SuppressWarnings("unchecked") + @Override + public MessageAction getNextAction(long messageId) { + try (Connection con = dataSource.getConnection()) { + String sql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY action_timestamp DESC"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setLong(1, messageId); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + long actionId = rs.getLong("message_action_id"); + MessageActionValue action = MessageActionValue.valueOf(rs.getString("action")); + ActionStatus actionStatus = ActionStatus.valueOf(rs.getString("action_status")); + String resolution = rs.getString("resolution"); + Date timestamp = rs.getTimestamp("action_timestamp"); + Date doneTimestamp = rs.getTimestamp("done_timestamp"); + Integer holdTimeO = rs.getInt("hold_time"); + int holdTime = holdTimeO != null ? holdTimeO : 0; + + Map<String, Object> returnResponseParam = + (Map<String, Object>) JsonUtil.jsonToData(rs.getString("response_param")); + String returnResponseBody = rs.getString("response_body"); + MessageData returnResponse = null; + if (returnResponseParam != null || returnResponseBody != null) { + returnResponse = new MessageData(returnResponseParam, returnResponseBody); + } + + MessageAction a = new MessageAction(actionId, action, actionStatus, resolution, timestamp, + doneTimestamp, holdTime, returnResponse); + return a; + } + return null; + } + } + } catch (SQLException e) { + throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e); + } + } + + public void setDataSource(DataSource dataSource) { + this.dataSource = dataSource; + } } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java index 4aa016575..17828f453 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java @@ -1,5 +1,5 @@ package org.onap.ccsdk.features.lib.doorman.data; public enum ActionStatus { - PENDING, DONE + PENDING, DONE } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java index 9960eefbf..557539722 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java @@ -1,5 +1,5 @@ package org.onap.ccsdk.features.lib.doorman.data; public enum Event { - ARRIVED, COMPLETED, AWAKEN, CHECK + ARRIVED, COMPLETED, AWAKEN, CHECK } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java index 1f0b11384..a1a6ef22f 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java @@ -1,33 +1,112 @@ package org.onap.ccsdk.features.lib.doorman.data; +import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; public class Message { - public long messageId; - public String extMessageId; - public MessageData request; - public MessageData response; - public Date arrivedTimestamp; - public Date startedTimestamp; - public Date completedTimestamp; - public Date responseTimestamp; - public Queue queue; - - public List<MessageStatus> statusHistory; - public List<MessageAction> actionHistory; - - @Override - public String toString() { - StringBuilder ss = new StringBuilder(); - ss.append(messageId); - if (extMessageId != null) { - ss.append("::").append(extMessageId); - } - if (queue != null) { - ss.append("::").append(queue); - } - return ss.toString(); - } + private long messageId; + private String extMessageId; + private MessageData request; + private MessageData response; + private Date arrivedTimestamp; + private Date startedTimestamp; + private Date completedTimestamp; + private Date responseTimestamp; + private Queue queue; + + private List<MessageStatus> statusHistory; + private List<MessageAction> actionHistory; + + public Message(long messageId, String extMessageId, MessageData request, Queue queue) { + this.messageId = messageId; + this.extMessageId = extMessageId; + this.request = request; + this.queue = queue; + } + + public Message(long messageId, String extMessageId, MessageData request, MessageData response, + Date arrivedTimestamp, Date startedTimestamp, Date completedTimestamp, Date responseTimestamp, Queue queue, + List<MessageStatus> statusHistory, List<MessageAction> actionHistory) { + this.messageId = messageId; + this.extMessageId = extMessageId; + this.request = request; + this.response = response; + this.arrivedTimestamp = arrivedTimestamp; + this.startedTimestamp = startedTimestamp; + this.completedTimestamp = completedTimestamp; + this.responseTimestamp = responseTimestamp; + this.queue = queue; + this.statusHistory = new ArrayList<>(statusHistory); + this.actionHistory = new ArrayList<>(actionHistory); + } + + @Override + public String toString() { + StringBuilder ss = new StringBuilder(); + ss.append(messageId); + if (extMessageId != null) { + ss.append("::").append(extMessageId); + } + if (queue != null) { + ss.append("::").append(queue); + } + return ss.toString(); + } + + public int getTimeInQueue(Date now) { + // Find the elapsed time since message arrived. That will be the timestamp of the first + // status of the message (last status in the status history list) + if (statusHistory != null && !statusHistory.isEmpty()) { + MessageStatus receivedStatus = statusHistory.get(statusHistory.size() - 1); + return (int) ((now.getTime() - receivedStatus.getTimestamp().getTime()) / 1000); + } + return 0; + } + + public long getMessageId() { + return messageId; + } + + public String getExtMessageId() { + return extMessageId; + } + + public MessageData getRequest() { + return request; + } + + public MessageData getResponse() { + return response; + } + + public Date getArrivedTimestamp() { + return arrivedTimestamp; + } + + public Date getStartedTimestamp() { + return startedTimestamp; + } + + public Date getCompletedTimestamp() { + return completedTimestamp; + } + + public Date getResponseTimestamp() { + return responseTimestamp; + } + + public Queue getQueue() { + return queue; + } + + public List<MessageStatus> getStatusHistory() { + return Collections.unmodifiableList(statusHistory); + } + + public List<MessageAction> getActionHistory() { + return Collections.unmodifiableList(actionHistory); + } } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java index 319fc68ff..e9657ca25 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java @@ -4,56 +4,116 @@ import java.util.Date; public class MessageAction { - public long actionId; - public MessageActionValue action; - public ActionStatus actionStatus; - public String resolution; - public Date timestamp; - public Date doneTimestamp; - public int holdTime; // in seconds - public MessageData returnResponse; - - @Override - public String toString() { - return action.toString() + " | " + actionStatus + " | " + resolution + " | " + holdTime + " | " + returnResponse; - } - - public boolean same(MessageAction a2) { - if (action != a2.action) { - return false; - } - if (action == MessageActionValue.HOLD || action == MessageActionValue.RETURN_HOLD) { - if (holdTime != a2.holdTime) { - return false; - } - } - if (action == MessageActionValue.RETURN_COMPLETE || action == MessageActionValue.RETURN_HOLD || action == MessageActionValue.RETURN_PROCESS) { - if (returnResponse == null != (a2.returnResponse == null)) { - return false; - } - if (returnResponse != null) { - if (returnResponse.param == null != (a2.returnResponse.param == null)) { - return false; - } - if (returnResponse.param != null && !returnResponse.param.equals(a2.returnResponse.param)) { - return false; - } - if (returnResponse.body == null != (a2.returnResponse.body == null)) { - return false; - } - if (returnResponse.body != null && !returnResponse.body.equals(a2.returnResponse.body)) { - return false; - } - } - } - if (action == MessageActionValue.RETURN_COMPLETE) { - if (resolution == null != (a2.resolution == null)) { - return false; - } - if (resolution != null && !resolution.equals(a2.resolution)) { - return false; - } - } - return true; - } + private long actionId; + private MessageActionValue action; + private ActionStatus actionStatus; + private String resolution; + private Date timestamp; + private Date doneTimestamp; + private int holdTime; // in seconds + private MessageData returnResponse; + + public MessageAction(long actionId, MessageActionValue action, ActionStatus actionStatus, String resolution, + Date timestamp, Date doneTimestamp, int holdTime, MessageData returnResponse) { + this.actionId = actionId; + this.action = action; + this.actionStatus = actionStatus; + this.resolution = resolution; + this.timestamp = timestamp; + this.doneTimestamp = doneTimestamp; + this.holdTime = holdTime; + this.returnResponse = returnResponse; + } + + public MessageAction(MessageActionValue action, String resolution, int holdTime, MessageData returnResponse) { + this.action = action; + actionStatus = ActionStatus.PENDING; + this.resolution = resolution; + timestamp = new Date(); + this.holdTime = holdTime; + this.returnResponse = returnResponse; + } + + public void setDone() { + actionStatus = ActionStatus.DONE; + doneTimestamp = new Date(); + } + + @Override + public String toString() { + return action.toString() + " | " + actionStatus + " | " + resolution + " | " + holdTime + " | " + + returnResponse; + } + + public boolean same(MessageAction a2) { + if (action != a2.action) { + return false; + } + if (action == MessageActionValue.HOLD || action == MessageActionValue.RETURN_HOLD) { + if (holdTime != a2.holdTime) { + return false; + } + } + if (action == MessageActionValue.RETURN_COMPLETE || action == MessageActionValue.RETURN_HOLD + || action == MessageActionValue.RETURN_PROCESS) { + if (returnResponse == null != (a2.returnResponse == null)) { + return false; + } + if (returnResponse != null) { + if (returnResponse.getParam() == null != (a2.returnResponse.getParam() == null)) { + return false; + } + if (returnResponse.getParam() != null && !returnResponse.getParam().equals(a2.returnResponse.getParam())) { + return false; + } + if (returnResponse.getBody() == null != (a2.returnResponse.getBody() == null)) { + return false; + } + if (returnResponse.getBody() != null && !returnResponse.getBody().equals(a2.returnResponse.getBody())) { + return false; + } + } + } + if (action == MessageActionValue.RETURN_COMPLETE) { + if (resolution == null != (a2.resolution == null)) { + return false; + } + if (resolution != null && !resolution.equals(a2.resolution)) { + return false; + } + } + return true; + } + + public long getActionId() { + return actionId; + } + + public MessageActionValue getAction() { + return action; + } + + public ActionStatus getActionStatus() { + return actionStatus; + } + + public String getResolution() { + return resolution; + } + + public Date getTimestamp() { + return timestamp; + } + + public Date getDoneTimestamp() { + return doneTimestamp; + } + + public int getHoldTime() { + return holdTime; + } + + public MessageData getReturnResponse() { + return returnResponse; + } } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java index fd86d37c8..6cf47bc74 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java @@ -1,5 +1,5 @@ package org.onap.ccsdk.features.lib.doorman.data; public enum MessageActionValue { - HOLD, PROCESS, RETURN_COMPLETE, RETURN_PROCESS, RETURN_HOLD + HOLD, PROCESS, RETURN_COMPLETE, RETURN_PROCESS, RETURN_HOLD } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java index 1fa5f9a14..eabcc7729 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java @@ -1,21 +1,36 @@ package org.onap.ccsdk.features.lib.doorman.data; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; public class MessageData { - public Map<String, Object> param; - public String body; + private Map<String, Object> param; + private String body; - @Override - public String toString() { - StringBuilder ss = new StringBuilder(); - ss.append(param); - String b = body; - if (b != null && b.length() > 20) { - b = b.substring(0, 20) + "..."; - } - ss.append(b); - return ss.toString(); - } + public MessageData(Map<String, Object> param, String body) { + this.param = new HashMap<>(param); + this.body = body; + } + + @Override + public String toString() { + StringBuilder ss = new StringBuilder(); + ss.append(param); + String b = body; + if (b != null && b.length() > 20) { + b = b.substring(0, 20) + "..."; + } + ss.append(b); + return ss.toString(); + } + + public Map<String, Object> getParam() { + return Collections.unmodifiableMap(param); + } + + public String getBody() { + return body; + } } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java index e9af20be1..41dc6bf58 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java @@ -4,6 +4,24 @@ import java.util.Date; public class MessageStatus { - public MessageStatusValue status; - public Date timestamp; + private MessageStatusValue status; + private Date timestamp; + + public MessageStatus(MessageStatusValue status, Date timestamp) { + this.status = status; + this.timestamp = timestamp; + } + + public MessageStatus(MessageStatusValue status) { + this.status = status; + timestamp = new Date(); + } + + public MessageStatusValue getStatus() { + return status; + } + + public Date getTimestamp() { + return timestamp; + } } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java index af2bbf024..cd0257267 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java @@ -1,5 +1,5 @@ package org.onap.ccsdk.features.lib.doorman.data; public enum MessageStatusValue { - ARRIVED, IN_QUEUE, PROCESSING_SYNC, PROCESSING_ASYNC, COMPLETED + ARRIVED, IN_QUEUE, PROCESSING_SYNC, PROCESSING_ASYNC, COMPLETED } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java index 7fd83104a..cfdf137a2 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java @@ -2,11 +2,24 @@ package org.onap.ccsdk.features.lib.doorman.data; public class Queue { - public String type; - public String id; + private String type; + private String id; - @Override - public String toString() { - return type + "::" + id; - } + public Queue(String type, String id) { + this.type = type; + this.id = id; + } + + @Override + public String toString() { + return type + "::" + id; + } + + public String getType() { + return type; + } + + public String getId() { + return id; + } } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java index a81942771..125c3d089 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java @@ -13,345 +13,342 @@ import org.onap.ccsdk.features.lib.doorman.data.Message; import org.onap.ccsdk.features.lib.doorman.data.MessageAction; import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue; import org.onap.ccsdk.features.lib.doorman.data.MessageData; -import org.onap.ccsdk.features.lib.doorman.data.MessageStatus; import org.onap.ccsdk.features.lib.doorman.data.MessageStatusValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MessageHandlerBaseImpl implements MessageQueueHandler { - private static final Logger log = LoggerFactory.getLogger(MessageHandlerBaseImpl.class); - - private boolean async = false; - private int maxParallelCount = 1; - private int maxQueueSize = 0; - private int timeToLive = 0; // in seconds - private int maxTimeInQueue = 60; // in seconds - private int updateWaitTime = 60; // in seconds - - @Override - public Map<Long, MessageAction> nextAction(Event event, Message msg, List<Message> queue) { - long t1 = System.currentTimeMillis(); - log.info(">>>>> Handler started for message: " + msg + ": " + event); - - PrioritizedMessage pmsg = null; - List<PrioritizedMessage> processing = new ArrayList<>(); - List<PrioritizedMessage> waiting = new ArrayList<>(); - Date now = new Date(); - for (Message m : queue) { - PrioritizedMessage pm = new PrioritizedMessage(); - pm.message = m; - pm.priority = assignPriority(msg); - pm.timeInQueue = getTimeInQueue(now, m); - pm.updateGroup = determineUpdateGroup(m); - pm.updateSequence = determineUpdateSequence(m); - - if (pm.message.messageId == msg.messageId) { - pmsg = pm; - - if (event != Event.COMPLETED) { - waiting.add(pm); - } - } else { - MessageStatusValue s = m.statusHistory.get(0).status; - if (s == MessageStatusValue.IN_QUEUE) { - waiting.add(pm); - } else if (s == MessageStatusValue.PROCESSING_SYNC || s == MessageStatusValue.PROCESSING_ASYNC) { - processing.add(pm); - } - } - } - - log.info(" Processing:"); - for (PrioritizedMessage pm : processing) { - log.info(" -- " + pm.message + " | " + pm.priority + " | " + pm.timeInQueue + " | " + pm.updateGroup + " | " + pm.updateSequence); - } - log.info(" Waiting:"); - for (PrioritizedMessage pm : waiting) { - log.info(" -- " + pm.message + " | " + pm.priority + " | " + pm.timeInQueue + " | " + pm.updateGroup + " | " + pm.updateSequence); - } - - log.info(" Determined actions:"); - - Collections.sort(waiting, (pm1, pm2) -> comparePrioritizedMessages(pm1, pm2)); - - Map<Long, MessageAction> mm = new HashMap<>(); - - List<PrioritizedMessage> skipList = findMessagesToSkip(processing, waiting); - addNextActionComplete(mm, skipList, Resolution.SKIPPED, now); - - waiting.removeAll(skipList); - - List<PrioritizedMessage> expiredList = findExpiredMessages(processing, waiting); - addNextActionComplete(mm, expiredList, Resolution.EXPIRED, now); - - waiting.removeAll(expiredList); - - List<PrioritizedMessage> dropList = findMessagesToDrop(processing, waiting); - addNextActionComplete(mm, dropList, Resolution.DROPPED, now); - - waiting.removeAll(dropList); - - List<PrioritizedMessage> processList = findMessagesToProcess(processing, waiting); - for (PrioritizedMessage pm : processList) { - MessageAction a = new MessageAction(); - a.timestamp = now; - if (async) { - a.action = MessageActionValue.RETURN_PROCESS; - a.returnResponse = ackResponse(pm.message); - } else { - a.action = MessageActionValue.PROCESS; - } - mm.put(pm.message.messageId, a); - log.info(" -- Next action for message: " + pm.message + ": " + a.action); - } - - if (event != Event.COMPLETED && !mm.containsKey(pmsg.message.messageId)) { - MessageAction a = new MessageAction(); - a.timestamp = now; - a.holdTime = pmsg.updateGroup != null ? updateWaitTime : maxTimeInQueue; - if (async) { - a.action = MessageActionValue.RETURN_HOLD; - a.returnResponse = ackResponse(pmsg.message); - } else { - a.action = MessageActionValue.HOLD; - } - mm.put(pmsg.message.messageId, a); - log.info(" -- Next action for message: " + pmsg.message + ": " + a.action + ": " + a.holdTime); - - waiting.remove(pmsg); - } - - if (event == Event.COMPLETED) { - MessageAction a = new MessageAction(); - a.timestamp = now; - a.action = MessageActionValue.RETURN_COMPLETE; - a.resolution = Resolution.PROCESSED.toString(); - a.returnResponse = completeResponse(Resolution.PROCESSED, msg); - mm.put(pmsg.message.messageId, a); - log.info(" -- Next action for message: " + pmsg.message + ": " + a.action + ": " + a.resolution); - } - - long t2 = System.currentTimeMillis(); - log.info("<<<<< Handler completed for message: " + msg + ": " + event + ": Time: " + (t2 - t1)); - return mm; - } - - private void addNextActionComplete(Map<Long, MessageAction> mm, List<PrioritizedMessage> skipList, Resolution r, Date now) { - for (PrioritizedMessage pm : skipList) { - MessageAction a = new MessageAction(); - a.action = MessageActionValue.RETURN_COMPLETE; - a.resolution = r.toString(); - a.timestamp = now; - a.returnResponse = completeResponse(r, pm.message); - mm.put(pm.message.messageId, a); - log.info(" -- Next action for message: " + pm.message + ": " + a.action + ": " + a.resolution); - } - } - - protected List<PrioritizedMessage> findMessagesToSkip(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) { - List<PrioritizedMessage> ll = new ArrayList<>(); - Map<String, PrioritizedMessage> lastMap = new HashMap<>(); - for (PrioritizedMessage pm : waiting) { - if (pm.updateGroup != null) { - PrioritizedMessage last = lastMap.get(pm.updateGroup); - if (last == null) { - lastMap.put(pm.updateGroup, pm); - } else { - if (pm.updateSequence > last.updateSequence) { - ll.add(last); - lastMap.put(pm.updateGroup, pm); - } - } - } - } - return ll; - } - - protected List<PrioritizedMessage> findExpiredMessages(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) { - List<PrioritizedMessage> ll = new ArrayList<>(); - if (timeToLive > 0) { - for (PrioritizedMessage pm : waiting) { - if (pm.timeInQueue > timeToLive) { - ll.add(pm); - } - } - } - return ll; - } - - protected List<PrioritizedMessage> findMessagesToDrop(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) { - List<PrioritizedMessage> ll = new ArrayList<>(); - if (maxQueueSize > 0) { - // Drop the least important messages (last in the prioritized waiting list) - for (int i = maxQueueSize; i < waiting.size(); i++) { - ll.add(waiting.get(i)); - } - } - return ll; - } - - protected List<PrioritizedMessage> findMessagesToProcess(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) { - List<PrioritizedMessage> ll = new ArrayList<>(); - - if (processing.size() >= maxParallelCount) { - return ll; - } - - // Find messages allowed to be processed based on the concurrency rules - - List<List<String>> currentLocks = new ArrayList<>(); - for (PrioritizedMessage m : processing) { - List<List<String>> locks = determineConcurency(m.message); - if (locks != null) { - currentLocks.addAll(locks); - } - } - - List<PrioritizedMessage> allowed = new ArrayList<>(); - for (PrioritizedMessage m : waiting) { - List<List<String>> neededLocks = determineConcurency(m.message); - if (allowed(currentLocks, neededLocks)) { - allowed.add(m); - } - } - - // Remove messages that are waiting on hold - Iterator<PrioritizedMessage> ii = allowed.iterator(); - while (ii.hasNext()) { - PrioritizedMessage pm = ii.next(); - if (pm.updateGroup != null) { - log.info(" --- Check: " + pm.message + ": " + pm.timeInQueue + " >= " + updateWaitTime); - if (pm.timeInQueue < updateWaitTime) { - ii.remove(); - } - } - } - - // Limit the number of processing messages to maxParallelCount - - for (int i = 0; i < allowed.size() && i < maxParallelCount - processing.size(); i++) { - ll.add(allowed.get(i)); - } - - return ll; - } - - private int comparePrioritizedMessages(PrioritizedMessage pm1, PrioritizedMessage pm2) { - if (pm1.timeInQueue >= maxTimeInQueue && pm2.timeInQueue < maxTimeInQueue) { - return -1; - } - if (pm1.timeInQueue < maxTimeInQueue && pm2.timeInQueue >= maxTimeInQueue) { - return 1; - } - if (pm1.priority < pm2.priority) { - return -1; - } - if (pm1.priority > pm2.priority) { - return 1; - } - if (pm1.timeInQueue > pm2.timeInQueue) { - return -1; - } - if (pm1.timeInQueue < pm2.timeInQueue) { - return 1; - } - return 0; - } - - private int getTimeInQueue(Date now, Message m) { - // Find the elapsed time since message arrived. That will be the timestamp of the first - // status of the message (last status in the status history list) - MessageStatus receivedStatus = m.statusHistory.get(m.statusHistory.size() - 1); - return (int) ((now.getTime() - receivedStatus.timestamp.getTime()) / 1000); - } - - private boolean allowed(List<List<String>> currentLocks, List<List<String>> neededLocks) { - if (neededLocks == null) { - return true; - } - for (List<String> neededLockLevels : neededLocks) { - for (List<String> currentLockLevels : currentLocks) { - int n = neededLockLevels.size() < currentLockLevels.size() ? neededLockLevels.size() : currentLockLevels.size(); - boolean good = false; - for (int i = 0; i < n; i++) { - if (!neededLockLevels.get(i).equals(currentLockLevels.get(i))) { - good = true; - break; - } - } - if (!good) { - return false; - } - } - } - return true; - } - - protected void initMessage(Message msg) { - } - - protected void closeMessage(Message msg) { - } - - protected long assignPriority(Message msg) { - return msg.messageId; // FIFO by default - } - - protected String determineUpdateGroup(Message msg) { - return null; - } - - protected long determineUpdateSequence(Message msg) { - return msg.messageId; - } - - protected List<List<String>> determineConcurency(Message msg) { - return null; - } - - protected MessageData ackResponse(Message msg) { - return null; - } - - protected MessageData completeResponse(Resolution r, Message msg) { - return null; - } - - protected static enum Resolution { - PROCESSED, SKIPPED, EXPIRED, DROPPED - } - - protected static class PrioritizedMessage { - - public Message message; - public long priority; - public int timeInQueue; - public String updateGroup; - public long updateSequence; - } - - public void setMaxParallelCount(int maxParallelCount) { - this.maxParallelCount = maxParallelCount; - } - - public void setTimeToLive(int timeToLive) { - this.timeToLive = timeToLive; - } - - public void setMaxTimeInQueue(int maxTimeInQueue) { - this.maxTimeInQueue = maxTimeInQueue; - } - - public void setUpdateWaitTime(int updateWaitTime) { - this.updateWaitTime = updateWaitTime; - } - - public void setMaxQueueSize(int maxQueueSize) { - this.maxQueueSize = maxQueueSize; - } - - public void setAsync(boolean async) { - this.async = async; - } + private static final Logger log = LoggerFactory.getLogger(MessageHandlerBaseImpl.class); + + private boolean async = false; + private int maxParallelCount = 1; + private int maxQueueSize = 0; + private int timeToLive = 0; // in seconds + private int maxTimeInQueue = 60; // in seconds + private int updateWaitTime = 60; // in seconds + + @Override + public Map<Long, MessageAction> nextAction(Event event, Message msg, List<Message> queue) { + long t1 = System.currentTimeMillis(); + log.info(">>>>> Handler started for message: " + msg + ": " + event); + + PrioritizedMessage pmsg = null; + List<PrioritizedMessage> processing = new ArrayList<>(); + List<PrioritizedMessage> waiting = new ArrayList<>(); + Date now = new Date(); + for (Message m : queue) { + PrioritizedMessage pm = new PrioritizedMessage(); + pm.message = m; + pm.priority = assignPriority(msg); + pm.timeInQueue = m.getTimeInQueue(now); + pm.updateGroup = determineUpdateGroup(m); + pm.updateSequence = determineUpdateSequence(m); + + if (pm.message.getMessageId() == msg.getMessageId()) { + pmsg = pm; + + if (event != Event.COMPLETED) { + waiting.add(pm); + } + } else { + MessageStatusValue s = m.getStatusHistory().get(0).getStatus(); + if (s == MessageStatusValue.IN_QUEUE) { + waiting.add(pm); + } else if (s == MessageStatusValue.PROCESSING_SYNC || s == MessageStatusValue.PROCESSING_ASYNC) { + processing.add(pm); + } + } + } + + log.info(" Processing:"); + for (PrioritizedMessage pm : processing) { + log.info(" -- " + pm.message + " | " + pm.priority + " | " + pm.timeInQueue + " | " + pm.updateGroup + + " | " + pm.updateSequence); + } + log.info(" Waiting:"); + for (PrioritizedMessage pm : waiting) { + log.info(" -- " + pm.message + " | " + pm.priority + " | " + pm.timeInQueue + " | " + pm.updateGroup + + " | " + pm.updateSequence); + } + + log.info(" Determined actions:"); + + Collections.sort(waiting, (pm1, pm2) -> comparePrioritizedMessages(pm1, pm2)); + + Map<Long, MessageAction> mm = new HashMap<>(); + + List<PrioritizedMessage> skipList = findMessagesToSkip(processing, waiting); + addNextActionComplete(mm, skipList, Resolution.SKIPPED, now); + + waiting.removeAll(skipList); + + List<PrioritizedMessage> expiredList = findExpiredMessages(processing, waiting); + addNextActionComplete(mm, expiredList, Resolution.EXPIRED, now); + + waiting.removeAll(expiredList); + + List<PrioritizedMessage> dropList = findMessagesToDrop(processing, waiting); + addNextActionComplete(mm, dropList, Resolution.DROPPED, now); + + waiting.removeAll(dropList); + + List<PrioritizedMessage> processList = findMessagesToProcess(processing, waiting); + for (PrioritizedMessage pm : processList) { + MessageActionValue action = MessageActionValue.PROCESS; + MessageData returnResponse = null; + if (async) { + action = MessageActionValue.RETURN_PROCESS; + returnResponse = ackResponse(pm.message); + } + MessageAction a = new MessageAction(action, null, 0, returnResponse); + mm.put(pm.message.getMessageId(), a); + log.info(" -- Next action for message: " + pm.message + ": " + a.getAction()); + } + + if (event != Event.COMPLETED && !mm.containsKey(pmsg.message.getMessageId())) { + MessageActionValue action = MessageActionValue.HOLD; + int holdTime = pmsg.updateGroup != null ? updateWaitTime : maxTimeInQueue; + MessageData returnResponse = null; + if (async) { + action = MessageActionValue.RETURN_HOLD; + returnResponse = ackResponse(pmsg.message); + } + MessageAction a = new MessageAction(action, null, holdTime, returnResponse); + mm.put(pmsg.message.getMessageId(), a); + log.info(" -- Next action for message: " + pmsg.message + ": " + a.getAction() + ": " + + a.getHoldTime()); + + waiting.remove(pmsg); + } + + if (event == Event.COMPLETED) { + MessageActionValue action = MessageActionValue.RETURN_COMPLETE; + String resolution = Resolution.PROCESSED.toString(); + MessageData returnResponse = completeResponse(Resolution.PROCESSED, msg); + MessageAction a = new MessageAction(action, resolution, 0, returnResponse); + mm.put(pmsg.message.getMessageId(), a); + log.info(" -- Next action for message: " + pmsg.message + ": " + a.getAction() + ": " + + a.getResolution()); + } + + long t2 = System.currentTimeMillis(); + log.info("<<<<< Handler completed for message: " + msg + ": " + event + ": Time: " + (t2 - t1)); + return mm; + } + + private void addNextActionComplete(Map<Long, MessageAction> mm, List<PrioritizedMessage> skipList, Resolution r, + Date now) { + for (PrioritizedMessage pm : skipList) { + MessageActionValue action = MessageActionValue.RETURN_COMPLETE; + String resolution = r.toString(); + MessageData returnResponse = completeResponse(r, pm.message); + MessageAction a = new MessageAction(action, resolution, 0, returnResponse); + mm.put(pm.message.getMessageId(), a); + log.info(" -- Next action for message: " + pm.message + ": " + a.getAction() + ": " + + a.getResolution()); + } + } + + protected List<PrioritizedMessage> findMessagesToSkip(List<PrioritizedMessage> processing, + List<PrioritizedMessage> waiting) { + List<PrioritizedMessage> ll = new ArrayList<>(); + Map<String, PrioritizedMessage> lastMap = new HashMap<>(); + for (PrioritizedMessage pm : waiting) { + if (pm.updateGroup != null) { + PrioritizedMessage last = lastMap.get(pm.updateGroup); + if (last == null) { + lastMap.put(pm.updateGroup, pm); + } else { + if (pm.updateSequence > last.updateSequence) { + ll.add(last); + lastMap.put(pm.updateGroup, pm); + } + } + } + } + return ll; + } + + protected List<PrioritizedMessage> findExpiredMessages(List<PrioritizedMessage> processing, + List<PrioritizedMessage> waiting) { + List<PrioritizedMessage> ll = new ArrayList<>(); + if (timeToLive > 0) { + for (PrioritizedMessage pm : waiting) { + if (pm.timeInQueue > timeToLive) { + ll.add(pm); + } + } + } + return ll; + } + + protected List<PrioritizedMessage> findMessagesToDrop(List<PrioritizedMessage> processing, + List<PrioritizedMessage> waiting) { + List<PrioritizedMessage> ll = new ArrayList<>(); + if (maxQueueSize > 0) { + // Drop the least important messages (last in the prioritized waiting list) + for (int i = maxQueueSize; i < waiting.size(); i++) { + ll.add(waiting.get(i)); + } + } + return ll; + } + + protected List<PrioritizedMessage> findMessagesToProcess(List<PrioritizedMessage> processing, + List<PrioritizedMessage> waiting) { + List<PrioritizedMessage> ll = new ArrayList<>(); + + if (processing.size() >= maxParallelCount) { + return ll; + } + + // Find messages allowed to be processed based on the concurrency rules + + List<List<String>> currentLocks = new ArrayList<>(); + for (PrioritizedMessage m : processing) { + List<List<String>> locks = determineConcurency(m.message); + if (locks != null) { + currentLocks.addAll(locks); + } + } + + List<PrioritizedMessage> allowed = new ArrayList<>(); + for (PrioritizedMessage m : waiting) { + List<List<String>> neededLocks = determineConcurency(m.message); + if (allowed(currentLocks, neededLocks)) { + allowed.add(m); + } + } + + // Remove messages that are waiting on hold + Iterator<PrioritizedMessage> ii = allowed.iterator(); + while (ii.hasNext()) { + PrioritizedMessage pm = ii.next(); + if (pm.updateGroup != null) { + log.info(" --- Check: " + pm.message + ": " + pm.timeInQueue + " >= " + updateWaitTime); + if (pm.timeInQueue < updateWaitTime) { + ii.remove(); + } + } + } + + // Limit the number of processing messages to maxParallelCount + + for (int i = 0; i < allowed.size() && i < maxParallelCount - processing.size(); i++) { + ll.add(allowed.get(i)); + } + + return ll; + } + + private int comparePrioritizedMessages(PrioritizedMessage pm1, PrioritizedMessage pm2) { + if (pm1.timeInQueue >= maxTimeInQueue && pm2.timeInQueue < maxTimeInQueue) { + return -1; + } + if (pm1.timeInQueue < maxTimeInQueue && pm2.timeInQueue >= maxTimeInQueue) { + return 1; + } + if (pm1.priority < pm2.priority) { + return -1; + } + if (pm1.priority > pm2.priority) { + return 1; + } + if (pm1.timeInQueue > pm2.timeInQueue) { + return -1; + } + if (pm1.timeInQueue < pm2.timeInQueue) { + return 1; + } + return 0; + } + + private boolean allowed(List<List<String>> currentLocks, List<List<String>> neededLocks) { + if (neededLocks == null) { + return true; + } + for (List<String> neededLockLevels : neededLocks) { + for (List<String> currentLockLevels : currentLocks) { + int n = neededLockLevels.size() < currentLockLevels.size() ? neededLockLevels.size() + : currentLockLevels.size(); + boolean good = false; + for (int i = 0; i < n; i++) { + if (!neededLockLevels.get(i).equals(currentLockLevels.get(i))) { + good = true; + break; + } + } + if (!good) { + return false; + } + } + } + return true; + } + + protected void initMessage(Message msg) {} + + protected void closeMessage(Message msg) {} + + protected long assignPriority(Message msg) { + return msg.getMessageId(); // FIFO by default + } + + protected String determineUpdateGroup(Message msg) { + return null; + } + + protected long determineUpdateSequence(Message msg) { + return msg.getMessageId(); // Order of receiving by default + } + + protected List<List<String>> determineConcurency(Message msg) { + return null; + } + + protected MessageData ackResponse(Message msg) { + return null; + } + + protected MessageData completeResponse(Resolution r, Message msg) { + return null; + } + + protected static enum Resolution { + PROCESSED, SKIPPED, EXPIRED, DROPPED + } + + protected static class PrioritizedMessage { + + public Message message; + public long priority; + public int timeInQueue; + public String updateGroup; + public long updateSequence; + } + + public void setMaxParallelCount(int maxParallelCount) { + this.maxParallelCount = maxParallelCount; + } + + public void setTimeToLive(int timeToLive) { + this.timeToLive = timeToLive; + } + + public void setMaxTimeInQueue(int maxTimeInQueue) { + this.maxTimeInQueue = maxTimeInQueue; + } + + public void setUpdateWaitTime(int updateWaitTime) { + this.updateWaitTime = updateWaitTime; + } + + public void setMaxQueueSize(int maxQueueSize) { + this.maxQueueSize = maxQueueSize; + } + + public void setAsync(boolean async) { + this.async = async; + } } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java index 61059e067..44897ee15 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java @@ -11,47 +11,47 @@ import org.onap.ccsdk.features.lib.rlock.LockHelper; public class MessageInterceptorFactoryImpl implements MessageInterceptorFactory { - private MessageClassifier messageClassifier; - private Map<String, MessageQueueHandler> handlerMap; - private MessageProcessor messageProcessor; - private MessageDao messageDao; - - private LockHelper lockHelper; - private int lockTimeout; // in seconds - - @Override - public MessageInterceptor create() { - MessageInterceptorImpl mi = new MessageInterceptorImpl(); - mi.setMessageClassifier(messageClassifier); - mi.setHandlerMap(handlerMap); - mi.setMessageProcessor(messageProcessor); - mi.setMessageDao(messageDao); - mi.setLockHelper(lockHelper); - mi.setLockTimeout(lockTimeout); - return mi; - } - - public void setMessageDao(MessageDao messageDao) { - this.messageDao = messageDao; - } - - public void setLockHelper(LockHelper lockHelper) { - this.lockHelper = lockHelper; - } - - public void setLockTimeout(int lockTimeout) { - this.lockTimeout = lockTimeout; - } - - public void setMessageClassifier(MessageClassifier messageClassifier) { - this.messageClassifier = messageClassifier; - } - - public void setHandlerMap(Map<String, MessageQueueHandler> handlerMap) { - this.handlerMap = handlerMap; - } - - public void setMessageProcessor(MessageProcessor messageProcessor) { - this.messageProcessor = messageProcessor; - } + private MessageClassifier messageClassifier; + private Map<String, MessageQueueHandler> handlerMap; + private MessageProcessor messageProcessor; + private MessageDao messageDao; + + private LockHelper lockHelper; + private int lockTimeout; // in seconds + + @Override + public MessageInterceptor create() { + MessageInterceptorImpl mi = new MessageInterceptorImpl(); + mi.setMessageClassifier(messageClassifier); + mi.setHandlerMap(handlerMap); + mi.setMessageProcessor(messageProcessor); + mi.setMessageDao(messageDao); + mi.setLockHelper(lockHelper); + mi.setLockTimeout(lockTimeout); + return mi; + } + + public void setMessageDao(MessageDao messageDao) { + this.messageDao = messageDao; + } + + public void setLockHelper(LockHelper lockHelper) { + this.lockHelper = lockHelper; + } + + public void setLockTimeout(int lockTimeout) { + this.lockTimeout = lockTimeout; + } + + public void setMessageClassifier(MessageClassifier messageClassifier) { + this.messageClassifier = messageClassifier; + } + + public void setHandlerMap(Map<String, MessageQueueHandler> handlerMap) { + this.handlerMap = handlerMap; + } + + public void setMessageProcessor(MessageProcessor messageProcessor) { + this.messageProcessor = messageProcessor; + } } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java index d2ab97101..89f29b327 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java @@ -25,312 +25,298 @@ import org.slf4j.LoggerFactory; public class MessageInterceptorImpl implements MessageInterceptor { - private static final Logger log = LoggerFactory.getLogger(MessageInterceptorImpl.class); - - private MessageClassifier messageClassifier; - private Map<String, MessageQueueHandler> handlerMap; - private MessageProcessor messageProcessor; - private MessageDao messageDao; - - private LockHelper lockHelper; - private int lockTimeout = 10; // in seconds - - private Message message = null; - private MessageQueueHandler handler = null; - - @Override - public MessageData processRequest(MessageData request) { - Date now = new Date(); - - Queue q = null; - String extMessageId = null; - if (messageClassifier != null) { - q = messageClassifier.determineQueue(request); - extMessageId = messageClassifier.getExtMessageId(request); - } - - long id = messageDao.addArrivedMessage(extMessageId, request, q, now); - - MessageStatus arrivedStatus = new MessageStatus(); - arrivedStatus.status = MessageStatusValue.ARRIVED; - arrivedStatus.timestamp = now; - messageDao.addStatus(id, arrivedStatus); - - message = new Message(); - message.messageId = id; - message.request = request; - message.arrivedTimestamp = now; - message.queue = q; - message.extMessageId = extMessageId; - - log.info("Message received: " + message); - - if (q != null && handlerMap != null) { - handler = handlerMap.get(q.type); - } - - if (q == null || handler == null) { - processSync(); - return null; // Do nothing, normal message processing - } - - Event event = Event.ARRIVED; - - while (true) { - MessageFunction func = new MessageFunction(event); - func.exec(); - - MessageAction nextAction = func.getNextAction(); - if (nextAction == null) { - processSync(); - return null; - } - - switch (nextAction.action) { - - case PROCESS: - processSync(); - return null; - - case HOLD: { - event = waitForNewAction(nextAction.holdTime); - break; - } - - case RETURN_COMPLETE: - returnComplete(nextAction); - return nextAction.returnResponse; - - case RETURN_PROCESS: - processAsync(nextAction.returnResponse); - return nextAction.returnResponse; - - case RETURN_HOLD: { - returnHold(nextAction); - return nextAction.returnResponse; - } - } - } - } - - private void processSync() { - messageDao.updateMessageStarted(message.messageId, new Date()); - log.info("Message processing started: " + message); - } - - private void processAsync(MessageData returnResponse) { - Thread t = new Thread(() -> processMessage(message.request), message.queue.type + "::" + message.queue.id + "::" + message.messageId); - t.start(); - - messageDao.updateMessageResponse(message.messageId, new Date(), returnResponse); - } - - private void processMessage(MessageData request) { - messageDao.updateMessageStarted(message.messageId, new Date()); - log.info("Message processing started: " + message); - if (messageProcessor != null) { - messageProcessor.processMessage(request); - } - - MessageFunction func = new MessageFunction(Event.COMPLETED); - func.exec(); - MessageAction nextAction = func.getNextAction(); - - messageDao.updateMessageCompleted(message.messageId, nextAction.resolution, new Date()); - log.info("Message processing completed: " + message); - } - - private void returnComplete(MessageAction nextAction) { - Date now = new Date(); - messageDao.updateMessageResponse(message.messageId, now, nextAction.returnResponse); - messageDao.updateMessageCompleted(message.messageId, nextAction.resolution, now); - log.info("Message processing completed: " + message); - } - - private void returnHold(MessageAction nextAction) { - Thread t = new Thread(() -> asyncQueue(nextAction), message.queue.type + "::" + message.queue.id + "::" + message.messageId); - t.start(); - messageDao.updateMessageResponse(message.messageId, new Date(), nextAction.returnResponse); - } - - private void asyncQueue(MessageAction nextAction) { - Event event = waitForNewAction(nextAction.holdTime); - - while (true) { - MessageFunction func = new MessageFunction(event); - func.exec(); - - nextAction = func.getNextAction(); - if (nextAction == null) { - processMessage(message.request); - return; - } - - switch (nextAction.action) { - case PROCESS: - processMessage(message.request); - return; - - case HOLD: { - event = waitForNewAction(nextAction.holdTime); - break; - } - - default: - return; - } - } - } - - private Event waitForNewAction(int holdTime) { - long startTime = System.currentTimeMillis(); - long currentTime = startTime; - while (currentTime - startTime <= (holdTime + 1) * 1000) { - try { - Thread.sleep(5000); - } catch (Exception e) { - } - - MessageAction nextAction = messageDao.getNextAction(message.messageId); - if (nextAction != null && nextAction.action != MessageActionValue.HOLD) { - return Event.AWAKEN; - } - - currentTime = System.currentTimeMillis(); - } - return Event.CHECK; - } - - @Override - public void processResponse(MessageData response) { - if (message == null) { - return; - } - - String resolution = null; - if (message.queue != null && handler != null) { - MessageFunction func = new MessageFunction(Event.COMPLETED); - func.exec(); - MessageAction nextAction = func.getNextAction(); - if (nextAction != null) { - resolution = nextAction.resolution; - } - } - - Date now = new Date(); - messageDao.updateMessageResponse(message.messageId, now, response); - messageDao.updateMessageCompleted(message.messageId, resolution, now); - log.info("Message processing completed: " + message); - } - - private class MessageFunction extends SynchronizedFunction { - - private Event event; - - private MessageAction nextAction = null; // Output - - public MessageFunction(Event event) { - super(lockHelper, Collections.singleton(message.queue.type + "::" + message.queue.id), lockTimeout); - this.event = event; - } - - @Override - protected void _exec() { - List<Message> messageQueue = messageDao.readMessageQueue(message.queue); - if (event == Event.AWAKEN) { - for (Message m : messageQueue) { - if (m.messageId == message.messageId) { - nextAction = m.actionHistory.get(0); - } - } - if (nextAction != null) { - messageDao.updateActionDone(nextAction.actionId, new Date()); - } - } else { - Map<Long, MessageAction> nextActionMap = handler.nextAction(event, message, messageQueue); - if (nextActionMap != null) { - for (Message m : messageQueue) { - MessageAction action = nextActionMap.get(m.messageId); - if (action != null) { - if (m.messageId == message.messageId) { - action.actionStatus = ActionStatus.DONE; - action.doneTimestamp = new Date(); - messageDao.addAction(m.messageId, action); - nextAction = action; - } else { - MessageAction lastAction = m.actionHistory.get(0); - if (lastAction.actionStatus != ActionStatus.PENDING || !action.same(lastAction)) { - action.actionStatus = ActionStatus.PENDING; - messageDao.addAction(m.messageId, action); - } - } - } - } - } - } - if (nextAction != null) { - log.info("Next message action: " + message + ":" + nextAction.action); - MessageStatus status = determineStatus(nextAction); - if (status != null) { - messageDao.addStatus(message.messageId, status); - log.info("Updating message status: " + message + ":" + status.status); - } - } - } - - public MessageAction getNextAction() { - return nextAction; - } - } - - private MessageStatus determineStatus(MessageAction action) { - if (action == null) { - return null; - } - - MessageStatus s = new MessageStatus(); - s.timestamp = new Date(); - - switch (action.action) { - case PROCESS: - s.status = MessageStatusValue.PROCESSING_SYNC; - break; - case HOLD: - case RETURN_HOLD: - s.status = MessageStatusValue.IN_QUEUE; - break; - case RETURN_PROCESS: - s.status = MessageStatusValue.PROCESSING_ASYNC; - break; - case RETURN_COMPLETE: - s.status = MessageStatusValue.COMPLETED; - break; - } - - return s; - } - - public void setMessageDao(MessageDao messageDao) { - this.messageDao = messageDao; - } - - public void setLockHelper(LockHelper lockHelper) { - this.lockHelper = lockHelper; - } - - public void setLockTimeout(int lockTimeout) { - this.lockTimeout = lockTimeout; - } - - public void setMessageClassifier(MessageClassifier messageClassifier) { - this.messageClassifier = messageClassifier; - } - - public void setHandlerMap(Map<String, MessageQueueHandler> handlerMap) { - this.handlerMap = handlerMap; - } - - public void setMessageProcessor(MessageProcessor messageProcessor) { - this.messageProcessor = messageProcessor; - } + private static final Logger log = LoggerFactory.getLogger(MessageInterceptorImpl.class); + + private MessageClassifier messageClassifier; + private Map<String, MessageQueueHandler> handlerMap; + private MessageProcessor messageProcessor; + private MessageDao messageDao; + + private LockHelper lockHelper; + private int lockTimeout = 10; // in seconds + + private Message message = null; + private MessageQueueHandler handler = null; + + @Override + public MessageData processRequest(MessageData request) { + Date now = new Date(); + + Queue queue = null; + String extMessageId = null; + if (messageClassifier != null) { + queue = messageClassifier.determineQueue(request); + extMessageId = messageClassifier.getExtMessageId(request); + } + + long id = messageDao.addArrivedMessage(extMessageId, request, queue, now); + + MessageStatus arrivedStatus = new MessageStatus(MessageStatusValue.ARRIVED, now); + messageDao.addStatus(id, arrivedStatus); + + message = new Message(id, extMessageId, request, queue); + + log.info("Message received: " + message); + + if (queue != null && handlerMap != null) { + handler = handlerMap.get(queue.getType()); + } + + if (queue == null || handler == null) { + processSync(); + return null; // Do nothing, normal message processing + } + + Event event = Event.ARRIVED; + + while (true) { + MessageFunction func = new MessageFunction(event); + func.exec(); + + MessageAction nextAction = func.getNextAction(); + if (nextAction == null) { + processSync(); + return null; + } + + switch (nextAction.getAction()) { + + case PROCESS: + processSync(); + return null; + + case HOLD: { + event = waitForNewAction(nextAction.getHoldTime()); + break; + } + + case RETURN_COMPLETE: + returnComplete(nextAction); + return nextAction.getReturnResponse(); + + case RETURN_PROCESS: + processAsync(nextAction.getReturnResponse()); + return nextAction.getReturnResponse(); + + case RETURN_HOLD: { + returnHold(nextAction); + return nextAction.getReturnResponse(); + } + } + } + } + + private void processSync() { + messageDao.updateMessageStarted(message.getMessageId(), new Date()); + log.info("Message processing started: " + message); + } + + private void processAsync(MessageData returnResponse) { + Thread t = new Thread(() -> processMessage(message.getRequest()), + message.getQueue().getType() + "::" + message.getQueue().getId() + "::" + message.getMessageId()); + t.start(); + + messageDao.updateMessageResponse(message.getMessageId(), new Date(), returnResponse); + } + + private void processMessage(MessageData request) { + messageDao.updateMessageStarted(message.getMessageId(), new Date()); + log.info("Message processing started: " + message); + if (messageProcessor != null) { + messageProcessor.processMessage(request); + } + + MessageFunction func = new MessageFunction(Event.COMPLETED); + func.exec(); + MessageAction nextAction = func.getNextAction(); + + messageDao.updateMessageCompleted(message.getMessageId(), nextAction.getResolution(), new Date()); + log.info("Message processing completed: " + message); + } + + private void returnComplete(MessageAction nextAction) { + Date now = new Date(); + messageDao.updateMessageResponse(message.getMessageId(), now, nextAction.getReturnResponse()); + messageDao.updateMessageCompleted(message.getMessageId(), nextAction.getResolution(), now); + log.info("Message processing completed: " + message); + } + + private void returnHold(MessageAction nextAction) { + Thread t = new Thread(() -> asyncQueue(nextAction), + message.getQueue().getType() + "::" + message.getQueue().getId() + "::" + message.getMessageId()); + t.start(); + messageDao.updateMessageResponse(message.getMessageId(), new Date(), nextAction.getReturnResponse()); + } + + private void asyncQueue(MessageAction nextAction) { + Event event = waitForNewAction(nextAction.getHoldTime()); + + while (true) { + MessageFunction func = new MessageFunction(event); + func.exec(); + + nextAction = func.getNextAction(); + if (nextAction == null) { + processMessage(message.getRequest()); + return; + } + + switch (nextAction.getAction()) { + case PROCESS: + processMessage(message.getRequest()); + return; + + case HOLD: { + event = waitForNewAction(nextAction.getHoldTime()); + break; + } + + default: + return; + } + } + } + + private Event waitForNewAction(int holdTime) { + long startTime = System.currentTimeMillis(); + long currentTime = startTime; + while (currentTime - startTime <= (holdTime + 1) * 1000) { + try { + Thread.sleep(5000); + } catch (Exception e) { + } + + MessageAction nextAction = messageDao.getNextAction(message.getMessageId()); + if (nextAction != null && nextAction.getAction() != MessageActionValue.HOLD) { + return Event.AWAKEN; + } + + currentTime = System.currentTimeMillis(); + } + return Event.CHECK; + } + + @Override + public void processResponse(MessageData response) { + if (message == null) { + return; + } + + String resolution = null; + if (message.getQueue() != null && handler != null) { + MessageFunction func = new MessageFunction(Event.COMPLETED); + func.exec(); + MessageAction nextAction = func.getNextAction(); + if (nextAction != null) { + resolution = nextAction.getResolution(); + } + } + + Date now = new Date(); + messageDao.updateMessageResponse(message.getMessageId(), now, response); + messageDao.updateMessageCompleted(message.getMessageId(), resolution, now); + log.info("Message processing completed: " + message); + } + + private class MessageFunction extends SynchronizedFunction { + + private Event event; + + private MessageAction nextAction = null; // Output + + public MessageFunction(Event event) { + super(lockHelper, Collections.singleton(message.getQueue().getType() + "::" + message.getQueue().getId()), + lockTimeout); + this.event = event; + } + + @Override + protected void _exec() { + List<Message> messageQueue = messageDao.readMessageQueue(message.getQueue()); + if (event == Event.AWAKEN) { + for (Message m : messageQueue) { + if (m.getMessageId() == message.getMessageId()) { + nextAction = m.getActionHistory().get(0); + } + } + if (nextAction != null) { + messageDao.updateActionDone(nextAction.getActionId(), new Date()); + } + } else { + Map<Long, MessageAction> nextActionMap = handler.nextAction(event, message, messageQueue); + if (nextActionMap != null) { + for (Message m : messageQueue) { + MessageAction action = nextActionMap.get(m.getMessageId()); + if (action != null) { + if (m.getMessageId() == message.getMessageId()) { + action.setDone(); + messageDao.addAction(m.getMessageId(), action); + nextAction = action; + } else { + MessageAction lastAction = m.getActionHistory().get(0); + if (lastAction.getActionStatus() != ActionStatus.PENDING || !action.same(lastAction)) { + messageDao.addAction(m.getMessageId(), action); + } + } + } + } + } + } + if (nextAction != null) { + log.info("Next message action: " + message + ":" + nextAction.getAction()); + MessageStatus status = determineStatus(nextAction); + if (status != null) { + messageDao.addStatus(message.getMessageId(), status); + log.info("Updating message status: " + message + ":" + status.getStatus()); + } + } + } + + public MessageAction getNextAction() { + return nextAction; + } + } + + private MessageStatus determineStatus(MessageAction action) { + if (action == null) { + return null; + } + + switch (action.getAction()) { + case PROCESS: + return new MessageStatus(MessageStatusValue.PROCESSING_SYNC); + case HOLD: + case RETURN_HOLD: + return new MessageStatus(MessageStatusValue.IN_QUEUE); + case RETURN_PROCESS: + return new MessageStatus(MessageStatusValue.PROCESSING_ASYNC); + case RETURN_COMPLETE: + return new MessageStatus(MessageStatusValue.COMPLETED); + } + return null; + } + + public void setMessageDao(MessageDao messageDao) { + this.messageDao = messageDao; + } + + public void setLockHelper(LockHelper lockHelper) { + this.lockHelper = lockHelper; + } + + public void setLockTimeout(int lockTimeout) { + this.lockTimeout = lockTimeout; + } + + public void setMessageClassifier(MessageClassifier messageClassifier) { + this.messageClassifier = messageClassifier; + } + + public void setHandlerMap(Map<String, MessageQueueHandler> handlerMap) { + this.handlerMap = handlerMap; + } + + public void setMessageProcessor(MessageProcessor messageProcessor) { + this.messageProcessor = messageProcessor; + } } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java index 73c3d8b6a..b8296ec19 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java @@ -13,7 +13,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; - import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; @@ -31,206 +30,199 @@ import org.onap.ccsdk.features.lib.doorman.data.MessageData; public class MessageInterceptorFilter implements Filter { - private MessageInterceptorFactory messageInterceptorFactory; - - @Override - public void init(FilterConfig filterConfig) throws ServletException { - } - - @Override - public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { - RequestWrapper req = new RequestWrapper((HttpServletRequest) request); - MessageData requestData = getMessageRequest(req); - - MessageInterceptor interceptor = messageInterceptorFactory.create(); - MessageData responseData = interceptor.processRequest(requestData); - - if (responseData == null) { - ResponseWrapper res = new ResponseWrapper((HttpServletResponse) response); - - chain.doFilter(req, res); - - responseData = res.getMessageResponse(); - interceptor.processResponse(responseData); - } - - setMessageResponse((HttpServletResponse) response, responseData); - } - - @SuppressWarnings("unchecked") - private void setMessageResponse(HttpServletResponse response, MessageData responseData) throws IOException { - if (responseData.param != null) { - String contentType = (String) responseData.param.get("content_type"); - if (contentType != null) { - response.setContentType(contentType); - } - Integer httpCode = (Integer) responseData.param.get("http_code"); - if (httpCode != null) { - response.setStatus(httpCode); - } - Map<String, Object> headers = (Map<String, Object>) responseData.param.get("headers"); - if (headers != null) { - for (Entry<String, Object> entry : headers.entrySet()) { - String name = entry.getKey(); - Object v = entry.getValue(); - if (v instanceof List) { - List<Object> ll = (List<Object>) v; - for (Object o : ll) { - response.addHeader(name, o.toString()); - } - } else { - response.setHeader(name, v.toString()); - } - } - } - } - - if (responseData.body != null) { - response.setContentLength(responseData.body.length()); - response.getWriter().write(responseData.body); - } - } - - @SuppressWarnings("unchecked") - private MessageData getMessageRequest(RequestWrapper request) { - MessageData requestData = new MessageData(); - requestData.param = new HashMap<>(); - requestData.param.put("http_method", request.getMethod()); - requestData.param.put("uri", request.getPathInfo()); - requestData.param.put("param", request.getParameterMap()); - requestData.param.put("content_type", request.getContentType()); - - Map<String, Object> headers = new HashMap<>(); - Enumeration<String> headerNames = request.getHeaderNames(); - while (headerNames.hasMoreElements()) { - String name = headerNames.nextElement(); - Enumeration<String> values = request.getHeaders(name); - List<String> valueList = new ArrayList<>(); - while (values.hasMoreElements()) { - valueList.add(values.nextElement()); - } - if (valueList.size() > 1) { - headers.put(name, valueList); - } else { - headers.put(name, valueList.get(0)); - } - } - requestData.param.put("headers", headers); - - requestData.body = request.getBody(); - - return requestData; - } - - @Override - public void destroy() { - } - - private static class RequestWrapper extends HttpServletRequestWrapper { - - private final String body; - - public RequestWrapper(HttpServletRequest request) throws IOException { - super(request); - - StringBuilder stringBuilder = new StringBuilder(); - try (InputStream inputStream = request.getInputStream()) { - if (inputStream != null) { - try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) { - char[] charBuffer = new char[128]; - int bytesRead = -1; - while ((bytesRead = bufferedReader.read(charBuffer)) > 0) { - stringBuilder.append(charBuffer, 0, bytesRead); - } - } - } - } - body = stringBuilder.toString(); - } - - @Override - public ServletInputStream getInputStream() throws IOException { - final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body.getBytes()); - ServletInputStream servletInputStream = new ServletInputStream() { - - @Override - public int read() throws IOException { - return byteArrayInputStream.read(); - } - }; - return servletInputStream; - } - - @Override - public BufferedReader getReader() throws IOException { - return new BufferedReader(new InputStreamReader(getInputStream())); - } - - public String getBody() { - return body; - } - } - - public class ResponseWrapper extends HttpServletResponseWrapper { - - private CharArrayWriter writer = new CharArrayWriter(); - private Map<String, Object> param = new HashMap<>(); - - public ResponseWrapper(HttpServletResponse response) { - super(response); - } - - @Override - public PrintWriter getWriter() { - return new PrintWriter(writer); - } - - @Override - public void setStatus(int status) { - param.put("http_code", status); - } - - @SuppressWarnings("unchecked") - @Override - public void setHeader(String name, String value) { - Map<String, Object> headers = (Map<String, Object>) param.get("headers"); - if (headers == null) { - headers = new HashMap<>(); - param.put("headers", headers); - } - headers.put(name, value); - } - - @SuppressWarnings("unchecked") - @Override - public void addHeader(String name, String value) { - Map<String, Object> headers = (Map<String, Object>) param.get("headers"); - if (headers == null) { - headers = new HashMap<>(); - param.put("headers", headers); - } - Object v = headers.get(name); - if (v == null) { - headers.put(name, value); - } else if (v instanceof List) { - ((List<Object>) v).add(value); - } else { - List<Object> ll = new ArrayList<>(); - ll.add(v); - ll.add(value); - headers.put(name, ll); - } - } - - public MessageData getMessageResponse() { - MessageData responseData = new MessageData(); - responseData.param = param; - responseData.body = writer.toString(); - return responseData; - } - } - - public void setMessageInterceptorFactory(MessageInterceptorFactory messageInterceptorFactory) { - this.messageInterceptorFactory = messageInterceptorFactory; - } + private MessageInterceptorFactory messageInterceptorFactory; + + @Override + public void init(FilterConfig filterConfig) throws ServletException {} + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + RequestWrapper req = new RequestWrapper((HttpServletRequest) request); + MessageData requestData = getMessageRequest(req); + + MessageInterceptor interceptor = messageInterceptorFactory.create(); + MessageData responseData = interceptor.processRequest(requestData); + + if (responseData == null) { + ResponseWrapper res = new ResponseWrapper((HttpServletResponse) response); + + chain.doFilter(req, res); + + responseData = res.getMessageResponse(); + interceptor.processResponse(responseData); + } + + setMessageResponse((HttpServletResponse) response, responseData); + } + + @SuppressWarnings("unchecked") + private void setMessageResponse(HttpServletResponse response, MessageData responseData) throws IOException { + if (responseData.getParam() != null) { + String contentType = (String) responseData.getParam().get("content_type"); + if (contentType != null) { + response.setContentType(contentType); + } + Integer httpCode = (Integer) responseData.getParam().get("http_code"); + if (httpCode != null) { + response.setStatus(httpCode); + } + Map<String, Object> headers = (Map<String, Object>) responseData.getParam().get("headers"); + if (headers != null) { + for (Entry<String, Object> entry : headers.entrySet()) { + String name = entry.getKey(); + Object v = entry.getValue(); + if (v instanceof List) { + List<Object> ll = (List<Object>) v; + for (Object o : ll) { + response.addHeader(name, o.toString()); + } + } else { + response.setHeader(name, v.toString()); + } + } + } + } + + if (responseData.getBody() != null) { + response.setContentLength(responseData.getBody().length()); + response.getWriter().write(responseData.getBody()); + } + } + + @SuppressWarnings("unchecked") + private MessageData getMessageRequest(RequestWrapper request) { + HashMap<String, Object> param = new HashMap<>(); + param.put("http_method", request.getMethod()); + param.put("uri", request.getPathInfo()); + param.put("param", request.getParameterMap()); + param.put("content_type", request.getContentType()); + + Map<String, Object> headers = new HashMap<>(); + Enumeration<String> headerNames = request.getHeaderNames(); + while (headerNames.hasMoreElements()) { + String name = headerNames.nextElement(); + Enumeration<String> values = request.getHeaders(name); + List<String> valueList = new ArrayList<>(); + while (values.hasMoreElements()) { + valueList.add(values.nextElement()); + } + if (valueList.size() > 1) { + headers.put(name, valueList); + } else { + headers.put(name, valueList.get(0)); + } + } + param.put("headers", headers); + + return new MessageData(param, request.getBody()); + } + + @Override + public void destroy() {} + + private static class RequestWrapper extends HttpServletRequestWrapper { + + private final String body; + + public RequestWrapper(HttpServletRequest request) throws IOException { + super(request); + + StringBuilder stringBuilder = new StringBuilder(); + try (InputStream inputStream = request.getInputStream()) { + if (inputStream != null) { + try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) { + char[] charBuffer = new char[128]; + int bytesRead = -1; + while ((bytesRead = bufferedReader.read(charBuffer)) > 0) { + stringBuilder.append(charBuffer, 0, bytesRead); + } + } + } + } + body = stringBuilder.toString(); + } + + @Override + public ServletInputStream getInputStream() throws IOException { + final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body.getBytes()); + ServletInputStream servletInputStream = new ServletInputStream() { + + @Override + public int read() throws IOException { + return byteArrayInputStream.read(); + } + }; + return servletInputStream; + } + + @Override + public BufferedReader getReader() throws IOException { + return new BufferedReader(new InputStreamReader(getInputStream())); + } + + public String getBody() { + return body; + } + } + + public class ResponseWrapper extends HttpServletResponseWrapper { + + private CharArrayWriter writer = new CharArrayWriter(); + private Map<String, Object> param = new HashMap<>(); + + public ResponseWrapper(HttpServletResponse response) { + super(response); + } + + @Override + public PrintWriter getWriter() { + return new PrintWriter(writer); + } + + @Override + public void setStatus(int status) { + param.put("http_code", status); + } + + @SuppressWarnings("unchecked") + @Override + public void setHeader(String name, String value) { + Map<String, Object> headers = (Map<String, Object>) param.get("headers"); + if (headers == null) { + headers = new HashMap<>(); + param.put("headers", headers); + } + headers.put(name, value); + } + + @SuppressWarnings("unchecked") + @Override + public void addHeader(String name, String value) { + Map<String, Object> headers = (Map<String, Object>) param.get("headers"); + if (headers == null) { + headers = new HashMap<>(); + param.put("headers", headers); + } + Object v = headers.get(name); + if (v == null) { + headers.put(name, value); + } else if (v instanceof List) { + ((List<Object>) v).add(value); + } else { + List<Object> ll = new ArrayList<>(); + ll.add(v); + ll.add(value); + headers.put(name, ll); + } + } + + public MessageData getMessageResponse() { + return new MessageData(param, writer.toString()); + } + } + + public void setMessageInterceptorFactory(MessageInterceptorFactory messageInterceptorFactory) { + this.messageInterceptorFactory = messageInterceptorFactory; + } } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java index 42f1b21e7..fefd25fb9 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java @@ -5,269 +5,281 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DataUtil { - @SuppressWarnings("unused") - private static final Logger log = LoggerFactory.getLogger(DataUtil.class); - - @SuppressWarnings("unchecked") - public static Object get(Object struct, String compositeKey) { - if (struct == null || compositeKey == null) { - return null; - } - - List<Object> keys = splitCompositeKey(compositeKey); - Object currentValue = struct; - String currentKey = ""; - - for (Object key : keys) { - if (key instanceof Integer) { - if (!(currentValue instanceof List)) { - throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list"); - } - - Integer keyi = (Integer) key; - List<Object> currentValueL = (List<Object>) currentValue; - - if (keyi >= currentValueL.size()) { - return null; - } - - currentValue = currentValueL.get(keyi); - if (currentValue == null) { - return null; - } - - currentKey += "[" + key + "]"; - } else { - if (!(currentValue instanceof Map)) { - throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map"); - } - - currentValue = ((Map<String, Object>) currentValue).get(key); - if (currentValue == null) { - return null; - } - - currentKey += "." + key; - } - } - return currentValue; - } - - @SuppressWarnings("unchecked") - public static void set(Object struct, String compositeKey, Object value) { - if (struct == null) { - throw new IllegalArgumentException("Null argument: struct"); - } - - if (compositeKey == null || compositeKey.length() == 0) { - throw new IllegalArgumentException("Null or empty argument: compositeKey"); - } - - if (value == null) { - return; - } - - List<Object> keys = splitCompositeKey(compositeKey); - Object currentValue = struct; - String currentKey = ""; - - for (int i = 0; i < keys.size() - 1; i++) { - Object key = keys.get(i); - - if (key instanceof Integer) { - if (!(currentValue instanceof List)) { - throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list"); - } - - Integer keyi = (Integer) key; - List<Object> currentValueL = (List<Object>) currentValue; - int size = currentValueL.size(); - - if (keyi >= size) { - for (int k = 0; k < keyi - size + 1; k++) { - currentValueL.add(null); - } - } - - Object newValue = currentValueL.get(keyi); - if (newValue == null) { - Object nextKey = keys.get(i + 1); - if (nextKey instanceof Integer) { - newValue = new ArrayList<>(); - } else { - newValue = new HashMap<>(); - } - currentValueL.set(keyi, newValue); - } - - currentValue = newValue; - currentKey += "[" + key + "]"; - - } else { - if (!(currentValue instanceof Map)) { - throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map"); - } - - Object newValue = ((Map<String, Object>) currentValue).get(key); - if (newValue == null) { - Object nextKey = keys.get(i + 1); - if (nextKey instanceof Integer) { - newValue = new ArrayList<>(); - } else { - newValue = new HashMap<>(); - } - ((Map<String, Object>) currentValue).put((String) key, newValue); - } - - currentValue = newValue; - currentKey += "." + key; - } - } - - Object key = keys.get(keys.size() - 1); - if (key instanceof Integer) { - if (!(currentValue instanceof List)) { - throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list"); - } - - Integer keyi = (Integer) key; - List<Object> currentValueL = (List<Object>) currentValue; - int size = currentValueL.size(); - - if (keyi >= size) { - for (int k = 0; k < keyi - size + 1; k++) { - currentValueL.add(null); - } - } - - currentValueL.set(keyi, value); - - } else { - if (!(currentValue instanceof Map)) { - throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map"); - } - - ((Map<String, Object>) currentValue).put((String) key, value); - } - } - - @SuppressWarnings("unchecked") - public static void delete(Object struct, String compositeKey) { - if (struct == null) { - throw new IllegalArgumentException("Null argument: struct"); - } - - if (compositeKey == null) { - throw new IllegalArgumentException("Null argument: compositeKey"); - } - - List<Object> keys = splitCompositeKey(compositeKey); - Object currentValue = struct; - String currentKey = ""; - - for (int i = 0; i < keys.size() - 1; i++) { - Object key = keys.get(i); - - if (key instanceof Integer) { - if (!(currentValue instanceof List)) { - throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list"); - } - - Integer keyi = (Integer) key; - List<Object> currentValueL = (List<Object>) currentValue; - - if (keyi >= currentValueL.size()) { - return; - } - - currentValue = currentValueL.get(keyi); - if (currentValue == null) { - return; - } - - currentKey += "[" + key + "]"; - - } else { - if (!(currentValue instanceof Map)) { - throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map"); - } - - currentValue = ((Map<String, Object>) currentValue).get(key); - if (currentValue == null) { - return; - } - - currentKey += "." + key; - } - } - - Object key = keys.get(keys.size() - 1); - if (key instanceof Integer) { - if (!(currentValue instanceof List)) { - throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list"); - } - - Integer keyi = (Integer) key; - List<Object> currentValueL = (List<Object>) currentValue; - - if (keyi < currentValueL.size()) { - currentValueL.remove(keyi); - } - - } else { - if (!(currentValue instanceof Map)) { - throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map"); - } - - ((Map<String, Object>) currentValue).remove(key); - } - } - - private static List<Object> splitCompositeKey(String compositeKey) { - if (compositeKey == null) { - return Collections.emptyList(); - } - - String[] ss = compositeKey.split("\\."); - List<Object> ll = new ArrayList<>(); - for (String s : ss) { - if (s.length() == 0) { - continue; - } - - int i1 = s.indexOf('['); - if (i1 < 0) { - ll.add(s); - } else { - if (!s.endsWith("]")) { - throw new IllegalArgumentException("Invalid composite key: " + compositeKey + ": No matching ] found"); - } - - String s1 = s.substring(0, i1); - if (s1.length() > 0) { - ll.add(s1); - } - - String s2 = s.substring(i1 + 1, s.length() - 1); - try { - int n = Integer.parseInt(s2); - if (n < 0) { - throw new IllegalArgumentException("Invalid composite key: " + compositeKey + ": Index must be >= 0: " + n); - } - - ll.add(n); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid composite key: " + compositeKey + ": Index not a number: " + s2); - } - } - } - - return ll; - } + @SuppressWarnings("unused") + private static final Logger log = LoggerFactory.getLogger(DataUtil.class); + + @SuppressWarnings("unchecked") + public static Object get(Object struct, String compositeKey) { + if (struct == null || compositeKey == null) { + return null; + } + + List<Object> keys = splitCompositeKey(compositeKey); + Object currentValue = struct; + String currentKey = ""; + + for (Object key : keys) { + if (key instanceof Integer) { + if (!(currentValue instanceof List)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + + currentKey + "', but '" + currentKey + "' is not a list"); + } + + Integer keyi = (Integer) key; + List<Object> currentValueL = (List<Object>) currentValue; + + if (keyi >= currentValueL.size()) { + return null; + } + + currentValue = currentValueL.get(keyi); + if (currentValue == null) { + return null; + } + + currentKey += "[" + key + "]"; + } else { + if (!(currentValue instanceof Map)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + + currentKey + "', but '" + currentKey + "' is not a map"); + } + + currentValue = ((Map<String, Object>) currentValue).get(key); + if (currentValue == null) { + return null; + } + + currentKey += "." + key; + } + } + return currentValue; + } + + @SuppressWarnings("unchecked") + public static void set(Object struct, String compositeKey, Object value) { + if (struct == null) { + throw new IllegalArgumentException("Null argument: struct"); + } + + if (compositeKey == null || compositeKey.length() == 0) { + throw new IllegalArgumentException("Null or empty argument: compositeKey"); + } + + if (value == null) { + return; + } + + List<Object> keys = splitCompositeKey(compositeKey); + Object currentValue = struct; + String currentKey = ""; + + for (int i = 0; i < keys.size() - 1; i++) { + Object key = keys.get(i); + + if (key instanceof Integer) { + if (!(currentValue instanceof List)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + + currentKey + "', but '" + currentKey + "' is not a list"); + } + + Integer keyi = (Integer) key; + List<Object> currentValueL = (List<Object>) currentValue; + int size = currentValueL.size(); + + if (keyi >= size) { + for (int k = 0; k < keyi - size + 1; k++) { + currentValueL.add(null); + } + } + + Object newValue = currentValueL.get(keyi); + if (newValue == null) { + Object nextKey = keys.get(i + 1); + if (nextKey instanceof Integer) { + newValue = new ArrayList<>(); + } else { + newValue = new HashMap<>(); + } + currentValueL.set(keyi, newValue); + } + + currentValue = newValue; + currentKey += "[" + key + "]"; + + } else { + if (!(currentValue instanceof Map)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + + currentKey + "', but '" + currentKey + "' is not a map"); + } + + Object newValue = ((Map<String, Object>) currentValue).get(key); + if (newValue == null) { + Object nextKey = keys.get(i + 1); + if (nextKey instanceof Integer) { + newValue = new ArrayList<>(); + } else { + newValue = new HashMap<>(); + } + ((Map<String, Object>) currentValue).put((String) key, newValue); + } + + currentValue = newValue; + currentKey += "." + key; + } + } + + Object key = keys.get(keys.size() - 1); + if (key instanceof Integer) { + if (!(currentValue instanceof List)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + + currentKey + "', but '" + currentKey + "' is not a list"); + } + + Integer keyi = (Integer) key; + List<Object> currentValueL = (List<Object>) currentValue; + int size = currentValueL.size(); + + if (keyi >= size) { + for (int k = 0; k < keyi - size + 1; k++) { + currentValueL.add(null); + } + } + + currentValueL.set(keyi, value); + + } else { + if (!(currentValue instanceof Map)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + + "', but '" + currentKey + "' is not a map"); + } + + ((Map<String, Object>) currentValue).put((String) key, value); + } + } + + @SuppressWarnings("unchecked") + public static void delete(Object struct, String compositeKey) { + if (struct == null) { + throw new IllegalArgumentException("Null argument: struct"); + } + + if (compositeKey == null) { + throw new IllegalArgumentException("Null argument: compositeKey"); + } + + List<Object> keys = splitCompositeKey(compositeKey); + Object currentValue = struct; + String currentKey = ""; + + for (int i = 0; i < keys.size() - 1; i++) { + Object key = keys.get(i); + + if (key instanceof Integer) { + if (!(currentValue instanceof List)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + + currentKey + "', but '" + currentKey + "' is not a list"); + } + + Integer keyi = (Integer) key; + List<Object> currentValueL = (List<Object>) currentValue; + + if (keyi >= currentValueL.size()) { + return; + } + + currentValue = currentValueL.get(keyi); + if (currentValue == null) { + return; + } + + currentKey += "[" + key + "]"; + + } else { + if (!(currentValue instanceof Map)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + + currentKey + "', but '" + currentKey + "' is not a map"); + } + + currentValue = ((Map<String, Object>) currentValue).get(key); + if (currentValue == null) { + return; + } + + currentKey += "." + key; + } + } + + Object key = keys.get(keys.size() - 1); + if (key instanceof Integer) { + if (!(currentValue instanceof List)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + + currentKey + "', but '" + currentKey + "' is not a list"); + } + + Integer keyi = (Integer) key; + List<Object> currentValueL = (List<Object>) currentValue; + + if (keyi < currentValueL.size()) { + currentValueL.remove(keyi); + } + + } else { + if (!(currentValue instanceof Map)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + + "', but '" + currentKey + "' is not a map"); + } + + ((Map<String, Object>) currentValue).remove(key); + } + } + + private static List<Object> splitCompositeKey(String compositeKey) { + if (compositeKey == null) { + return Collections.emptyList(); + } + + String[] ss = compositeKey.split("\\."); + List<Object> ll = new ArrayList<>(); + for (String s : ss) { + if (s.length() == 0) { + continue; + } + + int i1 = s.indexOf('['); + if (i1 < 0) { + ll.add(s); + } else { + if (!s.endsWith("]")) { + throw new IllegalArgumentException( + "Invalid composite key: " + compositeKey + ": No matching ] found"); + } + + String s1 = s.substring(0, i1); + if (s1.length() > 0) { + ll.add(s1); + } + + String s2 = s.substring(i1 + 1, s.length() - 1); + try { + int n = Integer.parseInt(s2); + if (n < 0) { + throw new IllegalArgumentException( + "Invalid composite key: " + compositeKey + ": Index must be >= 0: " + n); + } + + ll.add(n); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Invalid composite key: " + compositeKey + ": Index not a number: " + s2); + } + } + } + + return ll; + } } diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java index fc9ce0936..8d45a5f22 100644 --- a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java @@ -7,313 +7,317 @@ import java.util.Map; public class JsonUtil { - public static Object jsonToData(String s) { - if (s == null) { - return null; - } - return jsonToData(new Current(s)); - } - - private static class Current { - - public String s; - public int i; - public int line, pos; - - public Current(String s) { - this.s = s; - i = 0; - line = 1; - pos = 1; - } - - public void move() { - i++; - pos++; - } - - public void move(int k) { - i += k; - pos += k; - } - - public boolean end() { - return i >= s.length(); - } - - public char get() { - return s.charAt(i); - } - - public boolean is(String ss) { - return i < s.length() - ss.length() && s.substring(i, i + ss.length()).equals(ss); - } - - public boolean is(char c) { - return i < s.length() && s.charAt(i) == c; - } - - public void skipWhiteSpace() { - while (i < s.length() && s.charAt(i) <= 32) { - char cc = s.charAt(i); - if (cc == '\n') { - line++; - pos = 1; - } else if (cc == ' ' || cc == '\t') { - pos++; - } - i++; - } - } - } - - public static Object jsonToData(Current c) { - c.skipWhiteSpace(); - - if (c.end()) { - return ""; - } - - char cc = c.get(); - if (cc == '{') { - c.move(); - return jsonToMap(c); - } - if (cc == '[') { - c.move(); - return jsonToList(c); - } - return jsonToObject(c); - } - - private static Object jsonToObject(Current c) { - if (c.is('"')) { - c.move(); - StringBuilder ss = new StringBuilder(); - while (!c.end() && c.get() != '"') { - if (c.get() == '\\') { - c.move(); - char cc = c.get(); - switch (cc) { - case '\\': - ss.append('\\'); - break; - case '"': - ss.append('\"'); - break; - case 'n': - ss.append('\n'); - break; - case 'r': - ss.append('\r'); - break; - case 't': - ss.append('\t'); - break; - default: - throw new IllegalArgumentException("JSON parsing error: Invalid escaped character at (" + c.line + ':' + c.pos + "): " + cc); - } - } else { - ss.append(c.get()); - } - c.move(); - } - if (!c.end()) { - c.move(); // Skip the closing " - } - return ss.toString(); - } - if (c.is("true")) { - c.move(4); - return true; - } - if (c.is("false")) { - c.move(5); - return false; - } - if (c.is("null")) { - c.move(4); - return null; - } - StringBuilder ss = new StringBuilder(); - while (!c.end() && c.get() != ',' && c.get() != ']' && c.get() != '}' && c.get() != '\n' && c.get() != '\t') { - ss.append(c.get()); - c.move(); - } - try { - return Long.valueOf(ss.toString()); - } catch (Exception e1) { - try { - return Double.valueOf(ss.toString()); - } catch (Exception e2) { - throw new IllegalArgumentException("JSON parsing error: Invalid value at (" + c.line + ':' + c.pos + "): " + ss.toString()); - } - } - } - - private static List<Object> jsonToList(Current c) { - List<Object> ll = new ArrayList<>(); - c.skipWhiteSpace(); - while (!c.end() && c.get() != ']') { - Object value = jsonToData(c); - - ll.add(value); - - c.skipWhiteSpace(); - if (c.is(',')) { - c.move(); - c.skipWhiteSpace(); - } - } - - if (!c.end()) { - c.move(); // Skip the closing ] - } - - return ll; - } - - private static Map<String, Object> jsonToMap(Current c) { - Map<String, Object> mm = new HashMap<>(); - c.skipWhiteSpace(); - while (!c.end() && c.get() != '}') { - Object key = jsonToObject(c); - if (!(key instanceof String)) { - throw new IllegalArgumentException("JSON parsing error: Invalid key at (" + c.line + ':' + c.pos + "): " + key); - } - - c.skipWhiteSpace(); - if (!c.is(':')) { - throw new IllegalArgumentException("JSON parsing error: Expected ':' at (" + c.line + ':' + c.pos + ")"); - } - - c.move(); // Skip the : - Object value = jsonToData(c); - - mm.put((String) key, value); - - c.skipWhiteSpace(); - if (c.is(',')) { - c.move(); - c.skipWhiteSpace(); - } - } - - if (!c.end()) { - c.move(); // Skip the closing } - } - - return mm; - } - - public static String dataToJson(Object o, boolean escape) { - StringBuilder sb = new StringBuilder(); - str(sb, o, 0, false, escape); - return sb.toString(); - } - - public static String dataToJson(Object o) { - return dataToJson(o, true); - } - - @SuppressWarnings("unchecked") - private static void str(StringBuilder ss, Object o, int indent, boolean padFirst, boolean escape) { - if (o == null || o instanceof Boolean || o instanceof Number) { - if (padFirst) { - ss.append(pad(indent)); - } - ss.append(o); - return; - } - - if (o instanceof String) { - String s = escape ? escapeJson((String) o) : (String) o; - if (padFirst) { - ss.append(pad(indent)); - } - ss.append('"').append(s).append('"'); - return; - } - - if (o instanceof Number || o instanceof Boolean) { - if (padFirst) { - ss.append(pad(indent)); - } - ss.append(o); - return; - } - - if (o instanceof Map) { - Map<String, Object> mm = (Map<String, Object>) o; - - if (padFirst) { - ss.append(pad(indent)); - } - ss.append("{\n"); - - boolean first = true; - for (String k : mm.keySet()) { - if (!first) { - ss.append(",\n"); - } - first = false; - - Object v = mm.get(k); - ss.append(pad(indent + 1)); - if (escape) { - ss.append('"'); - } - ss.append(k); - if (escape) { - ss.append('"'); - } - ss.append(": "); - str(ss, v, indent + 1, false, escape); - } - - ss.append("\n"); - ss.append(pad(indent)).append('}'); - - return; - } - - if (o instanceof List) { - List<Object> ll = (List<Object>) o; - - if (padFirst) { - ss.append(pad(indent)); - } - ss.append("[\n"); - - boolean first = true; - for (Object o1 : ll) { - if (!first) { - ss.append(",\n"); - } - first = false; - - str(ss, o1, indent + 1, true, escape); - } - - ss.append("\n"); - ss.append(pad(indent)).append(']'); - } - } - - private static String escapeJson(String v) { - String s = v.replaceAll("\\\\", "\\\\\\\\"); - s = s.replaceAll("\"", "\\\\\""); - s = s.replaceAll("\n", "\\\\n"); - s = s.replaceAll("\r", "\\\\r"); - s = s.replaceAll("\t", "\\\\t"); - return s; - } - - private static String pad(int n) { - String s = ""; - for (int i = 0; i < n; i++) { - s += " "; - } - return s; - } + public static Object jsonToData(String s) { + if (s == null) { + return null; + } + return jsonToData(new Current(s)); + } + + private static class Current { + + public String s; + public int i; + public int line, pos; + + public Current(String s) { + this.s = s; + i = 0; + line = 1; + pos = 1; + } + + public void move() { + i++; + pos++; + } + + public void move(int k) { + i += k; + pos += k; + } + + public boolean end() { + return i >= s.length(); + } + + public char get() { + return s.charAt(i); + } + + public boolean is(String ss) { + return i < s.length() - ss.length() && s.substring(i, i + ss.length()).equals(ss); + } + + public boolean is(char c) { + return i < s.length() && s.charAt(i) == c; + } + + public void skipWhiteSpace() { + while (i < s.length() && s.charAt(i) <= 32) { + char cc = s.charAt(i); + if (cc == '\n') { + line++; + pos = 1; + } else if (cc == ' ' || cc == '\t') { + pos++; + } + i++; + } + } + } + + public static Object jsonToData(Current c) { + c.skipWhiteSpace(); + + if (c.end()) { + return ""; + } + + char cc = c.get(); + if (cc == '{') { + c.move(); + return jsonToMap(c); + } + if (cc == '[') { + c.move(); + return jsonToList(c); + } + return jsonToObject(c); + } + + private static Object jsonToObject(Current c) { + if (c.is('"')) { + c.move(); + StringBuilder ss = new StringBuilder(); + while (!c.end() && c.get() != '"') { + if (c.get() == '\\') { + c.move(); + char cc = c.get(); + switch (cc) { + case '\\': + ss.append('\\'); + break; + case '"': + ss.append('\"'); + break; + case 'n': + ss.append('\n'); + break; + case 'r': + ss.append('\r'); + break; + case 't': + ss.append('\t'); + break; + default: + throw new IllegalArgumentException("JSON parsing error: Invalid escaped character at (" + + c.line + ':' + c.pos + "): " + cc); + } + } else { + ss.append(c.get()); + } + c.move(); + } + if (!c.end()) { + c.move(); // Skip the closing " + } + return ss.toString(); + } + if (c.is("true")) { + c.move(4); + return true; + } + if (c.is("false")) { + c.move(5); + return false; + } + if (c.is("null")) { + c.move(4); + return null; + } + StringBuilder ss = new StringBuilder(); + while (!c.end() && c.get() != ',' && c.get() != ']' && c.get() != '}' && c.get() != '\n' && c.get() != '\t') { + ss.append(c.get()); + c.move(); + } + try { + return Long.valueOf(ss.toString()); + } catch (Exception e1) { + try { + return Double.valueOf(ss.toString()); + } catch (Exception e2) { + throw new IllegalArgumentException( + "JSON parsing error: Invalid value at (" + c.line + ':' + c.pos + "): " + ss.toString()); + } + } + } + + private static List<Object> jsonToList(Current c) { + List<Object> ll = new ArrayList<>(); + c.skipWhiteSpace(); + while (!c.end() && c.get() != ']') { + Object value = jsonToData(c); + + ll.add(value); + + c.skipWhiteSpace(); + if (c.is(',')) { + c.move(); + c.skipWhiteSpace(); + } + } + + if (!c.end()) { + c.move(); // Skip the closing ] + } + + return ll; + } + + private static Map<String, Object> jsonToMap(Current c) { + Map<String, Object> mm = new HashMap<>(); + c.skipWhiteSpace(); + while (!c.end() && c.get() != '}') { + Object key = jsonToObject(c); + if (!(key instanceof String)) { + throw new IllegalArgumentException( + "JSON parsing error: Invalid key at (" + c.line + ':' + c.pos + "): " + key); + } + + c.skipWhiteSpace(); + if (!c.is(':')) { + throw new IllegalArgumentException( + "JSON parsing error: Expected ':' at (" + c.line + ':' + c.pos + ")"); + } + + c.move(); // Skip the : + Object value = jsonToData(c); + + mm.put((String) key, value); + + c.skipWhiteSpace(); + if (c.is(',')) { + c.move(); + c.skipWhiteSpace(); + } + } + + if (!c.end()) { + c.move(); // Skip the closing } + } + + return mm; + } + + public static String dataToJson(Object o, boolean escape) { + StringBuilder sb = new StringBuilder(); + str(sb, o, 0, false, escape); + return sb.toString(); + } + + public static String dataToJson(Object o) { + return dataToJson(o, true); + } + + @SuppressWarnings("unchecked") + private static void str(StringBuilder ss, Object o, int indent, boolean padFirst, boolean escape) { + if (o == null || o instanceof Boolean || o instanceof Number) { + if (padFirst) { + ss.append(pad(indent)); + } + ss.append(o); + return; + } + + if (o instanceof String) { + String s = escape ? escapeJson((String) o) : (String) o; + if (padFirst) { + ss.append(pad(indent)); + } + ss.append('"').append(s).append('"'); + return; + } + + if (o instanceof Number || o instanceof Boolean) { + if (padFirst) { + ss.append(pad(indent)); + } + ss.append(o); + return; + } + + if (o instanceof Map) { + Map<String, Object> mm = (Map<String, Object>) o; + + if (padFirst) { + ss.append(pad(indent)); + } + ss.append("{\n"); + + boolean first = true; + for (String k : mm.keySet()) { + if (!first) { + ss.append(",\n"); + } + first = false; + + Object v = mm.get(k); + ss.append(pad(indent + 1)); + if (escape) { + ss.append('"'); + } + ss.append(k); + if (escape) { + ss.append('"'); + } + ss.append(": "); + str(ss, v, indent + 1, false, escape); + } + + ss.append("\n"); + ss.append(pad(indent)).append('}'); + + return; + } + + if (o instanceof List) { + List<Object> ll = (List<Object>) o; + + if (padFirst) { + ss.append(pad(indent)); + } + ss.append("[\n"); + + boolean first = true; + for (Object o1 : ll) { + if (!first) { + ss.append(",\n"); + } + first = false; + + str(ss, o1, indent + 1, true, escape); + } + + ss.append("\n"); + ss.append(pad(indent)).append(']'); + } + } + + private static String escapeJson(String v) { + String s = v.replaceAll("\\\\", "\\\\\\\\"); + s = s.replaceAll("\"", "\\\\\""); + s = s.replaceAll("\n", "\\\\n"); + s = s.replaceAll("\r", "\\\\r"); + s = s.replaceAll("\t", "\\\\t"); + return s; + } + + private static String pad(int n) { + String s = ""; + for (int i = 0; i < n; i++) { + s += " "; + } + return s; + } } diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueDataItem.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueDataItem.java index 32e94e5d3..e14e32545 100644 --- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueDataItem.java +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueDataItem.java @@ -8,84 +8,84 @@ import org.onap.ccsdk.features.lib.doorman.data.MessageStatus; public class MessageQueueDataItem implements Comparable<MessageQueueDataItem> { - public Date timeStamp; - public String extMessageId; - public MessageStatus status; - public MessageAction action; + public Date timeStamp; + public String extMessageId; + public MessageStatus status; + public MessageAction action; - @Override - public int compareTo(MessageQueueDataItem other) { - int c = timeStamp.compareTo(other.timeStamp); - if (c == 0) { - if (action != null && other.status != null) { - return -1; - } - if (status != null && other.action != null) { - return 1; - } - } - return c; - } + @Override + public int compareTo(MessageQueueDataItem other) { + int c = timeStamp.compareTo(other.timeStamp); + if (c == 0) { + if (action != null && other.status != null) { + return -1; + } + if (status != null && other.action != null) { + return 1; + } + } + return c; + } - @Override - public int hashCode() { - return System.identityHashCode(extMessageId); - } + @Override + public int hashCode() { + return System.identityHashCode(extMessageId); + } - @Override - public boolean equals(Object o) { - if (o == null || !(o instanceof MessageQueueDataItem)) { - return false; - } - MessageQueueDataItem other = (MessageQueueDataItem) o; - if (!extMessageId.equals(other.extMessageId)) { - return false; - } - if (status != null && (other.status == null || status.status != other.status.status)) { - return false; - } - if (action != null) { - if (other.action == null || action.action != other.action.action) { - return false; - } - if (action.action == MessageActionValue.HOLD || action.action == MessageActionValue.RETURN_HOLD) { - if (action.holdTime != other.action.holdTime) { - return false; - } - } else if (action.action == MessageActionValue.RETURN_COMPLETE) { - if (!action.resolution.equals(other.action.resolution)) { - return false; - } - } - } - return true; - } + @Override + public boolean equals(Object o) { + if (o == null || !(o instanceof MessageQueueDataItem)) { + return false; + } + MessageQueueDataItem other = (MessageQueueDataItem) o; + if (!extMessageId.equals(other.extMessageId)) { + return false; + } + if (status != null && (other.status == null || status.getStatus() != other.status.getStatus())) { + return false; + } + if (action != null) { + if (other.action == null || action.getAction() != other.action.getAction()) { + return false; + } + if (action.getAction() == MessageActionValue.HOLD || action.getAction() == MessageActionValue.RETURN_HOLD) { + if (action.getHoldTime() != other.action.getHoldTime()) { + return false; + } + } else if (action.getAction() == MessageActionValue.RETURN_COMPLETE) { + if (!action.getResolution().equals(other.action.getResolution())) { + return false; + } + } + } + return true; + } - @Override - public String toString() { - StringBuilder ss = new StringBuilder(); - if (timeStamp != null) { - ss.append(df.format(timeStamp)).append(" | "); - } - if (status != null) { - ss.append("STATUS: "); - } else { - ss.append("ACTION: "); - } - ss.append(String.format("%-20s | ", extMessageId)); - if (status != null) { - ss.append(status.status); - } else { - ss.append(action.action); - if (action.holdTime > 0) { - ss.append(" | ").append(action.holdTime); - } - if (action.resolution != null) { - ss.append(" | ").append(action.resolution); - } - } - return ss.toString(); - } + @Override + public String toString() { + StringBuilder ss = new StringBuilder(); + if (timeStamp != null) { + ss.append(df.format(timeStamp)).append(" | "); + } + if (status != null) { + ss.append("STATUS: "); + } else { + ss.append("ACTION: "); + } + ss.append(String.format("%-20s | ", extMessageId)); + if (status != null) { + ss.append(status.getStatus()); + } else { + ss.append(action.getAction()); + if (action.getHoldTime() > 0) { + ss.append(" | ").append(action.getHoldTime()); + } + if (action.getResolution() != null) { + ss.append(" | ").append(action.getResolution()); + } + } + return ss.toString(); + } - private static SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + private static SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); } diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTest.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTest.java index ca1c4910d..b2f69dbcf 100644 --- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTest.java +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTest.java @@ -32,191 +32,195 @@ import org.slf4j.LoggerFactory; @RunWith(Parameterized.class) public class MessageQueueTest { - private static final Logger log = LoggerFactory.getLogger(MessageQueueTest.class); - - private String test; - - public MessageQueueTest(String test) { - this.test = test; - } - - @SuppressWarnings("unchecked") - @Test - public void test() throws Exception { - log.info("#########################################################################"); - log.info("MessageQueueTest: " + test + " started"); - - String testSetupJson = readResource("it/" + test + "/test.json"); - Map<String, Object> testSetup = (Map<String, Object>) JsonUtil.jsonToData(testSetupJson); - - Map<String, MessageQueueHandler> handlerMap = new HashMap<>(); - - for (Object o : (List<Object>) testSetup.get("handler_list")) { - Map<String, Object> handlerSetup = (Map<String, Object>) o; - String queueType = (String) handlerSetup.get("queue_type"); - String handlerClassName = (String) handlerSetup.get("handler_class"); - try { - Class<?> handlerClass = Class.forName("org.onap.ccsdk.features.lib.doorman.it." + test + "." + handlerClassName); - MessageQueueHandler handler = (MessageQueueHandler) handlerClass.newInstance(); - handlerMap.put(queueType, handler); - log.info("Handler found for queue type: " + queueType + ": " + handlerClass.getName()); - } catch (Exception e) { - } - } - - MessageInterceptorFactory factory = setupMessageInterceptorFactory(handlerMap, new Classifier(), new Processor()); - - List<Object> requestList = (List<Object>) testSetup.get("request_list"); - - List<Thread> threadList = new ArrayList<>(); - - for (int i = 0; i < requestList.size(); i++) { - Map<String, Object> requestSetup = (Map<String, Object>) requestList.get(i); - - MessageData request = new MessageData(); - request.param = (Map<String, Object>) requestSetup.get("request_param"); - Map<String, Object> requestBodyData = (Map<String, Object>) requestSetup.get("request_body"); - if (requestBodyData != null) { - request.body = JsonUtil.dataToJson(requestBodyData); - } else { - request.body = readResource("it/" + test + "/" + requestSetup.get("request_body_file")); - } - - MessageData response = new MessageData(); - response.param = (Map<String, Object>) requestSetup.get("response_param"); - Map<String, Object> responseBodyData = (Map<String, Object>) requestSetup.get("response_body"); - if (responseBodyData != null) { - response.body = JsonUtil.dataToJson(responseBodyData); - } else { - response.body = readResource("it/" + test + "/" + requestSetup.get("response_body_file")); - } - - long startTime = (Long) requestSetup.get("start_time"); - long processTime = (Long) requestSetup.get("process_time"); - - MessageInterceptor interceptor = factory.create(); - - Thread t = new Thread((Runnable) () -> { - try { - Thread.sleep(startTime); - } catch (InterruptedException e) { - } - - MessageData r = interceptor.processRequest(request); - - if (r == null) { - try { - Thread.sleep(processTime); - } catch (InterruptedException e) { - } - - interceptor.processResponse(response); - } - - }, "Message-" + i); - - threadList.add(t); - t.start(); - } - - for (Thread t : threadList) { - t.join(); - } - - log.info("MessageQueueTest: " + test + " completed"); - log.info("Result:"); - - String testResultJson = readResource("it/" + test + "/result.json"); - Map<String, Object> testResult = (Map<String, Object>) JsonUtil.jsonToData(testResultJson); - MessageQueueTestResult.checkResult(testResult); - } - - private static class Classifier implements MessageClassifier { - - @Override - public Queue determineQueue(MessageData request) { - Queue q = new Queue(); - q.type = (String) request.param.get("queue_type"); - q.id = (String) request.param.get("queue_id"); - return q; - } - - @Override - public String getExtMessageId(MessageData request) { - return (String) request.param.get("request_id"); - } - } - - private static class Processor implements MessageProcessor { - - @Override - public void processMessage(MessageData request) { - long processTime = (Long) request.param.get("process_time"); - try { - Thread.sleep(processTime); - } catch (InterruptedException e) { - } - } - } - - public static MessageInterceptorFactory setupMessageInterceptorFactory(Map<String, MessageQueueHandler> handlerMap, MessageClassifier classifier, MessageProcessor processor) { - LockHelperImpl lockHelper = new LockHelperImpl(); - lockHelper.setDataSource(DbUtil.getDataSource()); - lockHelper.setLockWait(5); - lockHelper.setRetryCount(10); - - MessageDaoImpl messageDao = new MessageDaoImpl(); - messageDao.setDataSource(DbUtil.getDataSource()); - - MessageInterceptorFactoryImpl f = new MessageInterceptorFactoryImpl(); - f.setMessageDao(messageDao); - f.setLockHelper(lockHelper); - f.setLockTimeout(20); - f.setHandlerMap(handlerMap); - f.setMessageClassifier(classifier); - f.setMessageProcessor(processor); - return f; - } - - @Parameters(name = "{0}") - public static Collection<Object[]> allTests() { - List<Object[]> ll = new ArrayList<>(); - - String[] tcList = list("it"); - for (String tc : tcList) { - ll.add(new Object[] { tc }); - } - return ll; - } - - private static String[] list(String dir) { - try { - ClassLoader loader = Thread.currentThread().getContextClassLoader(); - URL url = loader.getResource(dir); - String path = url.getPath(); - return new File(path).list(); - } catch (Exception e) { - log.warn("Error getting directory list for: " + dir + ": " + e.getMessage(), e); - return new String[0]; - } - } - - private static String readResource(String resource) { - try { - ClassLoader loader = Thread.currentThread().getContextClassLoader(); - InputStream ins = loader.getResourceAsStream(resource); - BufferedReader in = new BufferedReader(new InputStreamReader(ins)); - StringBuilder ss = new StringBuilder(); - String line = in.readLine(); - while (line != null) { - ss.append(line).append('\n'); - line = in.readLine(); - } - in.close(); - return ss.toString(); - } catch (Exception e) { - log.warn("Error reading resource: " + resource + ": " + e.getMessage(), e); - return null; - } - } + private static final Logger log = LoggerFactory.getLogger(MessageQueueTest.class); + + private String test; + + public MessageQueueTest(String test) { + this.test = test; + } + + @SuppressWarnings("unchecked") + @Test + public void test() throws Exception { + log.info("#########################################################################"); + log.info("MessageQueueTest: " + test + " started"); + + String testSetupJson = readResource("it/" + test + "/test.json"); + Map<String, Object> testSetup = (Map<String, Object>) JsonUtil.jsonToData(testSetupJson); + + Map<String, MessageQueueHandler> handlerMap = new HashMap<>(); + + for (Object o : (List<Object>) testSetup.get("handler_list")) { + Map<String, Object> handlerSetup = (Map<String, Object>) o; + String queueType = (String) handlerSetup.get("queue_type"); + String handlerClassName = (String) handlerSetup.get("handler_class"); + try { + Class<?> handlerClass = + Class.forName("org.onap.ccsdk.features.lib.doorman.it." + test + "." + handlerClassName); + MessageQueueHandler handler = (MessageQueueHandler) handlerClass.newInstance(); + handlerMap.put(queueType, handler); + log.info("Handler found for queue type: " + queueType + ": " + handlerClass.getName()); + } catch (Exception e) { + } + } + + MessageInterceptorFactory factory = + setupMessageInterceptorFactory(handlerMap, new Classifier(), new Processor()); + + List<Object> requestList = (List<Object>) testSetup.get("request_list"); + + List<Thread> threadList = new ArrayList<>(); + + for (int i = 0; i < requestList.size(); i++) { + Map<String, Object> requestSetup = (Map<String, Object>) requestList.get(i); + + Map<String, Object> requestParam = (Map<String, Object>) requestSetup.get("request_param"); + Map<String, Object> requestBodyData = (Map<String, Object>) requestSetup.get("request_body"); + String requestBody = null; + if (requestBodyData != null) { + requestBody = JsonUtil.dataToJson(requestBodyData); + } else { + requestBody = readResource("it/" + test + "/" + requestSetup.get("request_body_file")); + } + MessageData request = new MessageData(requestParam, requestBody); + + Map<String, Object> responseParam = (Map<String, Object>) requestSetup.get("response_param"); + Map<String, Object> responseBodyData = (Map<String, Object>) requestSetup.get("response_body"); + String responseBody = null; + if (responseBodyData != null) { + responseBody = JsonUtil.dataToJson(responseBodyData); + } else { + responseBody = readResource("it/" + test + "/" + requestSetup.get("response_body_file")); + } + MessageData response = new MessageData(responseParam, responseBody); + + long startTime = (Long) requestSetup.get("start_time"); + long processTime = (Long) requestSetup.get("process_time"); + + MessageInterceptor interceptor = factory.create(); + + Thread t = new Thread((Runnable) () -> { + try { + Thread.sleep(startTime); + } catch (InterruptedException e) { + } + + MessageData r = interceptor.processRequest(request); + + if (r == null) { + try { + Thread.sleep(processTime); + } catch (InterruptedException e) { + } + + interceptor.processResponse(response); + } + + }, "Message-" + i); + + threadList.add(t); + t.start(); + } + + for (Thread t : threadList) { + t.join(); + } + + log.info("MessageQueueTest: " + test + " completed"); + log.info("Result:"); + + String testResultJson = readResource("it/" + test + "/result.json"); + Map<String, Object> testResult = (Map<String, Object>) JsonUtil.jsonToData(testResultJson); + MessageQueueTestResult.checkResult(testResult); + } + + private static class Classifier implements MessageClassifier { + + @Override + public Queue determineQueue(MessageData request) { + String queueType = (String) request.getParam().get("queue_type"); + String queueId = (String) request.getParam().get("queue_id"); + return new Queue(queueType, queueId); + } + + @Override + public String getExtMessageId(MessageData request) { + return (String) request.getParam().get("request_id"); + } + } + + private static class Processor implements MessageProcessor { + + @Override + public void processMessage(MessageData request) { + long processTime = (Long) request.getParam().get("process_time"); + try { + Thread.sleep(processTime); + } catch (InterruptedException e) { + } + } + } + + public static MessageInterceptorFactory setupMessageInterceptorFactory(Map<String, MessageQueueHandler> handlerMap, + MessageClassifier classifier, MessageProcessor processor) { + LockHelperImpl lockHelper = new LockHelperImpl(); + lockHelper.setDataSource(DbUtil.getDataSource()); + lockHelper.setLockWait(5); + lockHelper.setRetryCount(10); + + MessageDaoImpl messageDao = new MessageDaoImpl(); + messageDao.setDataSource(DbUtil.getDataSource()); + + MessageInterceptorFactoryImpl f = new MessageInterceptorFactoryImpl(); + f.setMessageDao(messageDao); + f.setLockHelper(lockHelper); + f.setLockTimeout(20); + f.setHandlerMap(handlerMap); + f.setMessageClassifier(classifier); + f.setMessageProcessor(processor); + return f; + } + + @Parameters(name = "{0}") + public static Collection<Object[]> allTests() { + List<Object[]> ll = new ArrayList<>(); + + String[] tcList = list("it"); + for (String tc : tcList) { + ll.add(new Object[] {tc}); + } + return ll; + } + + private static String[] list(String dir) { + try { + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + URL url = loader.getResource(dir); + String path = url.getPath(); + return new File(path).list(); + } catch (Exception e) { + log.warn("Error getting directory list for: " + dir + ": " + e.getMessage(), e); + return new String[0]; + } + } + + private static String readResource(String resource) { + try { + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + InputStream ins = loader.getResourceAsStream(resource); + BufferedReader in = new BufferedReader(new InputStreamReader(ins)); + StringBuilder ss = new StringBuilder(); + String line = in.readLine(); + while (line != null) { + ss.append(line).append('\n'); + line = in.readLine(); + } + in.close(); + return ss.toString(); + } catch (Exception e) { + log.warn("Error reading resource: " + resource + ": " + e.getMessage(), e); + return null; + } + } } diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTestResult.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTestResult.java index 71a2eeafb..9d297d52f 100644 --- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTestResult.java +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTestResult.java @@ -1,7 +1,6 @@ package org.onap.ccsdk.features.lib.doorman.it; import static org.junit.Assert.fail; - import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -19,102 +18,101 @@ import org.slf4j.LoggerFactory; public class MessageQueueTestResult { - private static final Logger log = LoggerFactory.getLogger(MessageQueueTest.class); + private static final Logger log = LoggerFactory.getLogger(MessageQueueTest.class); - public static List<MessageQueueDataItem> readResult(String queueType, String queueId) { - MessageDaoImpl messageDao = new MessageDaoImpl(); - messageDao.setDataSource(DbUtil.getDataSource()); + public static List<MessageQueueDataItem> readResult(String queueType, String queueId) { + MessageDaoImpl messageDao = new MessageDaoImpl(); + messageDao.setDataSource(DbUtil.getDataSource()); - Queue q = new Queue(); - q.type = queueType; - q.id = queueId; + Queue q = new Queue(queueType, queueId); - List<Message> messageList = messageDao.readMessageQueue(q); + List<Message> messageList = messageDao.readMessageQueue(q); - List<MessageQueueDataItem> ll = new ArrayList<>(); - if (messageList != null) { - for (Message m : messageList) { - if (m.statusHistory != null) { - for (MessageStatus s : m.statusHistory) { - MessageQueueDataItem item = new MessageQueueDataItem(); - item.extMessageId = m.extMessageId; - item.status = s; - item.timeStamp = s.timestamp; - ll.add(item); - } - } - if (m.actionHistory != null) { - for (MessageAction a : m.actionHistory) { - MessageQueueDataItem item = new MessageQueueDataItem(); - item.extMessageId = m.extMessageId; - item.action = a; - item.timeStamp = a.timestamp; - ll.add(item); - } - } - } - } - Collections.sort(ll); - return ll; - } + List<MessageQueueDataItem> ll = new ArrayList<>(); + if (messageList != null) { + for (Message m : messageList) { + if (m.getStatusHistory() != null) { + for (MessageStatus s : m.getStatusHistory()) { + MessageQueueDataItem item = new MessageQueueDataItem(); + item.extMessageId = m.getExtMessageId(); + item.status = s; + item.timeStamp = s.getTimestamp(); + ll.add(item); + } + } + if (m.getActionHistory() != null) { + for (MessageAction a : m.getActionHistory()) { + MessageQueueDataItem item = new MessageQueueDataItem(); + item.extMessageId = m.getExtMessageId(); + item.action = a; + item.timeStamp = a.getTimestamp(); + ll.add(item); + } + } + } + } + Collections.sort(ll); + return ll; + } - @SuppressWarnings("unchecked") - public static void checkResult(Map<String, Object> testResult) { - List<Object> resultSetupList = (List<Object>) testResult.get("queue_list"); - if (resultSetupList != null) { - for (Object o : resultSetupList) { - Map<String, Object> resultSetup = (Map<String, Object>) o; - String queueType = (String) resultSetup.get("queue_type"); - String queueId = (String) resultSetup.get("queue_id"); - log.info(queueType + "::" + queueId); - List<MessageQueueDataItem> itemList = MessageQueueTestResult.readResult(queueType, queueId); - for (MessageQueueDataItem item : itemList) { - log.info(" --- " + item); - } + @SuppressWarnings("unchecked") + public static void checkResult(Map<String, Object> testResult) { + List<Object> resultSetupList = (List<Object>) testResult.get("queue_list"); + if (resultSetupList != null) { + for (Object o : resultSetupList) { + Map<String, Object> resultSetup = (Map<String, Object>) o; + String queueType = (String) resultSetup.get("queue_type"); + String queueId = (String) resultSetup.get("queue_id"); + log.info(queueType + "::" + queueId); + List<MessageQueueDataItem> itemList = MessageQueueTestResult.readResult(queueType, queueId); + for (MessageQueueDataItem item : itemList) { + log.info(" --- " + item); + } - List<Object> checkSequenceList = (List<Object>) resultSetup.get("check_sequence_list"); - for (int i = 0; i < checkSequenceList.size(); i++) { - List<Object> checkSequence = (List<Object>) checkSequenceList.get(i); - List<MessageQueueDataItem> checkItemList = new ArrayList<>(); - for (Object o2 : checkSequence) { - String[] ss = ((String) o2).split("\\|"); - MessageQueueDataItem item = new MessageQueueDataItem(); - item.extMessageId = ss[1].trim(); - if (ss[0].trim().equals("STATUS")) { - item.status = new MessageStatus(); - item.status.status = MessageStatusValue.valueOf(ss[2].trim()); - } else { - item.action = new MessageAction(); - item.action.action = MessageActionValue.valueOf(ss[2].trim()); - if (item.action.action == MessageActionValue.HOLD || item.action.action == MessageActionValue.RETURN_HOLD) { - item.action.holdTime = Integer.parseInt(ss[3].trim()); - } else if (item.action.action == MessageActionValue.RETURN_COMPLETE) { - item.action.resolution = ss[3].trim(); - } - } - checkItemList.add(item); - } - List<MessageQueueDataItem> itemList1 = new ArrayList<>(itemList); - itemList1.retainAll(checkItemList); - if (!itemList1.equals(checkItemList)) { - log.info("Expected sequence #" + i + " not found"); - log.info("Expected sequence:"); - for (MessageQueueDataItem item : checkItemList) { - log.info(" --- " + item); - } - log.info("Found sequence:"); - for (MessageQueueDataItem item : itemList1) { - log.info(" --- " + item); - } - fail("Expected sequence #" + i + " not found"); - } else { - log.info("Expected sequence #" + i + " found in the result:"); - for (MessageQueueDataItem item : checkItemList) { - log.info(" --- " + item); - } - } - } - } - } - } + List<Object> checkSequenceList = (List<Object>) resultSetup.get("check_sequence_list"); + for (int i = 0; i < checkSequenceList.size(); i++) { + List<Object> checkSequence = (List<Object>) checkSequenceList.get(i); + List<MessageQueueDataItem> checkItemList = new ArrayList<>(); + for (Object o2 : checkSequence) { + String[] ss = ((String) o2).split("\\|"); + MessageQueueDataItem item = new MessageQueueDataItem(); + item.extMessageId = ss[1].trim(); + if (ss[0].trim().equals("STATUS")) { + item.status = new MessageStatus(MessageStatusValue.valueOf(ss[2].trim())); + } else { + MessageActionValue action = MessageActionValue.valueOf(ss[2].trim()); + int holdTime = 0; + String resolution = null; + if (action == MessageActionValue.HOLD || action == MessageActionValue.RETURN_HOLD) { + holdTime = Integer.parseInt(ss[3].trim()); + } else if (action == MessageActionValue.RETURN_COMPLETE) { + resolution = ss[3].trim(); + } + item.action = new MessageAction(action, resolution, holdTime, null); + } + checkItemList.add(item); + } + List<MessageQueueDataItem> itemList1 = new ArrayList<>(itemList); + itemList1.retainAll(checkItemList); + if (!itemList1.equals(checkItemList)) { + log.info("Expected sequence #" + i + " not found"); + log.info("Expected sequence:"); + for (MessageQueueDataItem item : checkItemList) { + log.info(" --- " + item); + } + log.info("Found sequence:"); + for (MessageQueueDataItem item : itemList1) { + log.info(" --- " + item); + } + fail("Expected sequence #" + i + " not found"); + } else { + log.info("Expected sequence #" + i + " found in the result:"); + for (MessageQueueDataItem item : checkItemList) { + log.info(" --- " + item); + } + } + } + } + } + } } diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Classifier.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Classifier.java deleted file mode 100644 index 254fd4ce4..000000000 --- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Classifier.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.onap.ccsdk.features.lib.doorman.it.tc1; - -import org.onap.ccsdk.features.lib.doorman.MessageClassifier; -import org.onap.ccsdk.features.lib.doorman.data.MessageData; -import org.onap.ccsdk.features.lib.doorman.data.Queue; - -public class Classifier implements MessageClassifier { - - @Override - public Queue determineQueue(MessageData request) { - Queue q = new Queue(); - q.type = "Cluster"; - q.id = "test-queue"; - return q; - } - - @Override - public String getExtMessageId(MessageData request) { - return (String) request.param.get("request_id"); - } -} diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Handler.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Handler.java index f57fdbc36..8ee7b6a89 100644 --- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Handler.java +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Handler.java @@ -4,7 +4,7 @@ import org.onap.ccsdk.features.lib.doorman.impl.MessageHandlerBaseImpl; public class Handler extends MessageHandlerBaseImpl { - public Handler() { + public Handler() { - } + } } diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Classifier.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Classifier.java deleted file mode 100644 index a32f74650..000000000 --- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Classifier.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.onap.ccsdk.features.lib.doorman.it.tc2; - -import org.onap.ccsdk.features.lib.doorman.MessageClassifier; -import org.onap.ccsdk.features.lib.doorman.data.MessageData; -import org.onap.ccsdk.features.lib.doorman.data.Queue; - -public class Classifier implements MessageClassifier { - - @Override - public Queue determineQueue(MessageData request) { - Queue q = new Queue(); - q.type = "Cluster"; - q.id = "test-queue"; - return q; - } - - @Override - public String getExtMessageId(MessageData request) { - return (String) request.param.get("request_id"); - } -} diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Handler.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Handler.java index 35b811c5f..0be64372d 100644 --- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Handler.java +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Handler.java @@ -9,45 +9,44 @@ import org.onap.ccsdk.features.lib.doorman.util.JsonUtil; public class Handler extends MessageHandlerBaseImpl { - public Handler() { - setMaxParallelCount(100); - setUpdateWaitTime(20); - } + public Handler() { + setMaxParallelCount(100); + setUpdateWaitTime(20); + } - @SuppressWarnings("unchecked") - @Override - protected String determineUpdateGroup(Message msg) { - if (msg.request.body != null) { - Map<String, Object> body = (Map<String, Object>) JsonUtil.jsonToData(msg.request.body); - String op = (String) body.get("operation"); - String entityId = (String) body.get("entity_id"); - if ("update".equals(op)) { - return entityId; - } - } - return super.determineUpdateGroup(msg); - } + @SuppressWarnings("unchecked") + @Override + protected String determineUpdateGroup(Message msg) { + if (msg.getRequest().getBody() != null) { + Map<String, Object> body = (Map<String, Object>) JsonUtil.jsonToData(msg.getRequest().getBody()); + String op = (String) body.get("operation"); + String entityId = (String) body.get("entity_id"); + if ("update".equals(op)) { + return entityId; + } + } + return super.determineUpdateGroup(msg); + } - @Override - protected long determineUpdateSequence(Message msg) { - if (msg.request.param != null) { - Long n = (Long) msg.request.param.get("sequence_number"); - if (n != null) { - return n; - } - } - return super.determineUpdateSequence(msg); - } + @Override + protected long determineUpdateSequence(Message msg) { + if (msg.getRequest().getParam() != null) { + Long n = (Long) msg.getRequest().getParam().get("sequence_number"); + if (n != null) { + return n; + } + } + return super.determineUpdateSequence(msg); + } - @Override - protected MessageData completeResponse(Resolution r, Message msg) { - if (r == Resolution.SKIPPED) { - MessageData response = new MessageData(); - response.param = new HashMap<>(); - response.param.put("http_code", 200L); - response.body = "{ \"status\": \"success\" }"; - return response; - } - return super.completeResponse(r, msg); - } + @Override + protected MessageData completeResponse(Resolution r, Message msg) { + if (r == Resolution.SKIPPED) { + Map<String, Object> param = new HashMap<>(); + param.put("http_code", 200L); + String body = "{ \"status\": \"success\" }"; + return new MessageData(param, body); + } + return super.completeResponse(r, msg); + } } diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/DbUtil.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/DbUtil.java index 1e525b4b2..b6ce6ca03 100644 --- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/DbUtil.java +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/DbUtil.java @@ -6,87 +6,83 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; - import javax.sql.DataSource; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DbUtil { - private static final Logger log = LoggerFactory.getLogger(DbUtil.class); - - private static DataSource dataSource = null; - - public static synchronized DataSource getDataSource() { - if (dataSource == null) { - String url = "jdbc:h2:mem:app;DB_CLOSE_DELAY=-1"; - - dataSource = new DataSource() { - - @Override - public <T> T unwrap(Class<T> arg0) throws SQLException { - return null; - } - - @Override - public boolean isWrapperFor(Class<?> arg0) throws SQLException { - return false; - } - - @Override - public void setLoginTimeout(int arg0) throws SQLException { - } - - @Override - public void setLogWriter(PrintWriter arg0) throws SQLException { - } - - @Override - public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { - return null; - } - - @Override - public int getLoginTimeout() throws SQLException { - return 0; - } - - @Override - public PrintWriter getLogWriter() throws SQLException { - return null; - } - - @Override - public Connection getConnection(String username, String password) throws SQLException { - return null; - } - - @Override - public Connection getConnection() throws SQLException { - return DriverManager.getConnection(url); - } - }; - - try { - String script = FileUtil.read("/schema.sql"); - - String[] sqlList = script.split(";"); - try (Connection con = dataSource.getConnection()) { - for (String sql : sqlList) { - if (!sql.trim().isEmpty()) { - sql = sql.trim(); - try (PreparedStatement ps = con.prepareStatement(sql)) { - log.info("Executing statement:\n" + sql); - ps.execute(); - } - } - } - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - return dataSource; - } + private static final Logger log = LoggerFactory.getLogger(DbUtil.class); + + private static DataSource dataSource = null; + + public static synchronized DataSource getDataSource() { + if (dataSource == null) { + String url = "jdbc:h2:mem:app;DB_CLOSE_DELAY=-1"; + + dataSource = new DataSource() { + + @Override + public <T> T unwrap(Class<T> arg0) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class<?> arg0) throws SQLException { + return false; + } + + @Override + public void setLoginTimeout(int arg0) throws SQLException {} + + @Override + public void setLogWriter(PrintWriter arg0) throws SQLException {} + + @Override + public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { + return null; + } + + @Override + public int getLoginTimeout() throws SQLException { + return 0; + } + + @Override + public PrintWriter getLogWriter() throws SQLException { + return null; + } + + @Override + public Connection getConnection(String username, String password) throws SQLException { + return null; + } + + @Override + public Connection getConnection() throws SQLException { + return DriverManager.getConnection(url); + } + }; + + try { + String script = FileUtil.read("/schema.sql"); + + String[] sqlList = script.split(";"); + try (Connection con = dataSource.getConnection()) { + for (String sql : sqlList) { + if (!sql.trim().isEmpty()) { + sql = sql.trim(); + try (PreparedStatement ps = con.prepareStatement(sql)) { + log.info("Executing statement:\n" + sql); + ps.execute(); + } + } + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return dataSource; + } } diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/FileUtil.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/FileUtil.java index 5ae92b4fc..4958fe376 100644 --- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/FileUtil.java +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/FileUtil.java @@ -6,19 +6,19 @@ import java.io.InputStreamReader; public class FileUtil { - public static String read(String fileName) throws Exception { - String ss = ""; - try (InputStream is = DbUtil.class.getResourceAsStream(fileName)) { - try (InputStreamReader isr = new InputStreamReader(is)) { - try (BufferedReader in = new BufferedReader(isr)) { - String s = in.readLine(); - while (s != null) { - ss += s + '\n'; - s = in.readLine(); - } - } - } - } - return ss; - } + public static String read(String fileName) throws Exception { + String ss = ""; + try (InputStream is = DbUtil.class.getResourceAsStream(fileName)) { + try (InputStreamReader isr = new InputStreamReader(is)) { + try (BufferedReader in = new BufferedReader(isr)) { + String s = in.readLine(); + while (s != null) { + ss += s + '\n'; + s = in.readLine(); + } + } + } + } + return ss; + } } diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/util/TestJsonUtil.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/util/TestJsonUtil.java index f0ac87e24..7d4bd9456 100644 --- a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/util/TestJsonUtil.java +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/util/TestJsonUtil.java @@ -1,7 +1,6 @@ package org.onap.ccsdk.features.lib.doorman.util; import java.util.Map; - import org.junit.Assert; import org.junit.Test; import org.onap.ccsdk.features.lib.doorman.testutil.FileUtil; @@ -11,16 +10,16 @@ import org.slf4j.LoggerFactory; public class TestJsonUtil { - private static final Logger log = LoggerFactory.getLogger(TestJsonUtil.class); + private static final Logger log = LoggerFactory.getLogger(TestJsonUtil.class); - @SuppressWarnings("unchecked") - @Test - public void testJsonConversion() throws Exception { - String json1 = FileUtil.read("/test1.json"); - Map<String, Object> data1 = (Map<String, Object>) JsonUtil.jsonToData(json1); - String convJson1 = JsonUtil.dataToJson(data1); - log.info("Converted JSON:\n" + convJson1); - Map<String, Object> convData1 = (Map<String, Object>) JsonUtil.jsonToData(convJson1); - Assert.assertEquals(data1, convData1); - } + @SuppressWarnings("unchecked") + @Test + public void testJsonConversion() throws Exception { + String json1 = FileUtil.read("/test1.json"); + Map<String, Object> data1 = (Map<String, Object>) JsonUtil.jsonToData(json1); + String convJson1 = JsonUtil.dataToJson(data1); + log.info("Converted JSON:\n" + convJson1); + Map<String, Object> convData1 = (Map<String, Object>) JsonUtil.jsonToData(convJson1); + Assert.assertEquals(data1, convData1); + } } |