diff options
author | sb5356 <sb5356@att.com> | 2020-03-30 13:14:46 -0400 |
---|---|---|
committer | sb5356 <sb5356@att.com> | 2020-03-30 15:16:27 -0400 |
commit | 1c1791c5498dad7b7fd6b1591e0c5844d4c6c601 (patch) | |
tree | c4d718094ae13779ba2a858739b34945b48a8dde /lib/doorman/src/main | |
parent | 30691263393c3862ed0707220afeb1a9a44d7773 (diff) |
Add new modules: Resource Lock and Doorman
Issue-ID: CCSDK-2226
Signed-off-by: Stan Bonev <sb5356@att.com>
Change-Id: I30f83dd4a852fd185dbdaa9a833f5ba544d35ba1
Diffstat (limited to 'lib/doorman/src/main')
22 files changed, 2116 insertions, 0 deletions
diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java new file mode 100644 index 000000000..b6af1731e --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java @@ -0,0 +1,11 @@ +package org.onap.ccsdk.features.lib.doorman; + +import org.onap.ccsdk.features.lib.doorman.data.MessageData; +import org.onap.ccsdk.features.lib.doorman.data.Queue; + +public interface MessageClassifier { + + Queue determineQueue(MessageData request); + + String getExtMessageId(MessageData request); +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java new file mode 100644 index 000000000..1ff811baa --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java @@ -0,0 +1,10 @@ +package org.onap.ccsdk.features.lib.doorman; + +import org.onap.ccsdk.features.lib.doorman.data.MessageData; + +public interface MessageInterceptor { + + MessageData processRequest(MessageData request); + + void processResponse(MessageData response); +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java new file mode 100644 index 000000000..4c9886425 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java @@ -0,0 +1,6 @@ +package org.onap.ccsdk.features.lib.doorman; + +public interface MessageInterceptorFactory { + + MessageInterceptor create(); +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java new file mode 100644 index 000000000..0bb05a34c --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java @@ -0,0 +1,8 @@ +package org.onap.ccsdk.features.lib.doorman; + +import org.onap.ccsdk.features.lib.doorman.data.MessageData; + +public interface MessageProcessor { + + void processMessage(MessageData request); +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java new file mode 100644 index 000000000..4285393a4 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java @@ -0,0 +1,12 @@ +package org.onap.ccsdk.features.lib.doorman; + +import java.util.List; +import java.util.Map; +import org.onap.ccsdk.features.lib.doorman.data.Event; +import org.onap.ccsdk.features.lib.doorman.data.Message; +import org.onap.ccsdk.features.lib.doorman.data.MessageAction; + +public interface MessageQueueHandler { + + Map<Long, MessageAction> nextAction(Event event, Message msg, List<Message> queue); +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java new file mode 100644 index 000000000..764faa9c8 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java @@ -0,0 +1,30 @@ +package org.onap.ccsdk.features.lib.doorman.dao; + +import java.util.Date; +import java.util.List; +import org.onap.ccsdk.features.lib.doorman.data.Message; +import org.onap.ccsdk.features.lib.doorman.data.MessageAction; +import org.onap.ccsdk.features.lib.doorman.data.MessageData; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatus; +import org.onap.ccsdk.features.lib.doorman.data.Queue; + +public interface MessageDao { + + long addArrivedMessage(String extMessageId, MessageData request, Queue queue, Date timestamp); + + void updateMessageStarted(long messageId, Date timestamp); + + void updateMessageCompleted(long messageId, String resolution, Date timestamp); + + void updateMessageResponse(long messageId, Date timestamp, MessageData response); + + void addStatus(long messageId, MessageStatus status); + + void addAction(long messageId, MessageAction action); + + void updateActionDone(long actionId, Date now); + + List<Message> readMessageQueue(Queue queue); + + MessageAction getNextAction(long messageId); +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java new file mode 100644 index 000000000..b3ecf3d4b --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java @@ -0,0 +1,307 @@ +package org.onap.ccsdk.features.lib.doorman.dao; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import javax.sql.DataSource; +import org.onap.ccsdk.features.lib.doorman.data.ActionStatus; +import org.onap.ccsdk.features.lib.doorman.data.Message; +import org.onap.ccsdk.features.lib.doorman.data.MessageAction; +import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue; +import org.onap.ccsdk.features.lib.doorman.data.MessageData; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatus; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatusValue; +import org.onap.ccsdk.features.lib.doorman.data.Queue; +import org.onap.ccsdk.features.lib.doorman.util.JsonUtil; + +public class MessageDaoImpl implements MessageDao { + + private DataSource dataSource; + + @Override + public long addArrivedMessage(String extMessageId, MessageData request, Queue queue, Date timestamp) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + long id = 0; + String sql = "INSERT INTO message (ext_message_id, request_param, request_body, arrived_timestamp, queue_type, queue_id) VALUES (?, ?, ?, ?, ?, ?)"; + try (PreparedStatement ps = con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { + ps.setString(1, extMessageId); + ps.setString(2, JsonUtil.dataToJson(request.param)); + ps.setString(3, request.body); + ps.setTimestamp(4, new Timestamp(timestamp.getTime())); + if (queue != null) { + ps.setString(5, queue.type); + ps.setString(6, queue.id); + } else { + ps.setNull(5, Types.VARCHAR); + ps.setNull(6, Types.VARCHAR); + } + ps.executeUpdate(); + try (ResultSet rs = ps.getGeneratedKeys()) { + rs.next(); + id = rs.getLong(1); + } + } + con.commit(); + return id; + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error inserting message to DB: " + e.getMessage(), e); + } + } + + @Override + public void updateMessageStarted(long messageId, Date timestamp) { + updateMessageStatus("started_timestamp", messageId, null, timestamp); + } + + @Override + public void updateMessageCompleted(long messageId, String resolution, Date timestamp) { + updateMessageStatus("completed_timestamp", messageId, resolution, timestamp); + } + + private void updateMessageStatus(String timestampColumn, long messageId, String resolution, Date timestamp) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + String sql = "UPDATE message SET " + timestampColumn + " = ? WHERE message_id = ?"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setTimestamp(1, new Timestamp(timestamp.getTime())); + ps.setLong(2, messageId); + ps.executeUpdate(); + } + con.commit(); + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error updating message status in DB: " + e.getMessage(), e); + } + } + + @Override + public void updateMessageResponse(long messageId, Date timestamp, MessageData response) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + String sql = "UPDATE message SET response_timestamp = ?, response_param = ?, response_body = ? WHERE message_id = ?"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setTimestamp(1, new Timestamp(timestamp.getTime())); + ps.setString(2, JsonUtil.dataToJson(response.param)); + ps.setString(3, response.body); + ps.setLong(4, messageId); + ps.executeUpdate(); + } + con.commit(); + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error updating message response in DB: " + e.getMessage(), e); + } + } + + @Override + public void addStatus(long messageId, MessageStatus status) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + String sql = "INSERT INTO message_status (message_id, status, status_timestamp) VALUES (?, ?, ?)"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setLong(1, messageId); + ps.setString(2, status.status.toString()); + ps.setTimestamp(3, new Timestamp(status.timestamp.getTime())); + ps.executeUpdate(); + } + con.commit(); + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error inserting message status to DB: " + e.getMessage(), e); + } + } + + @Override + public void addAction(long messageId, MessageAction action) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + String sql = "INSERT INTO message_action (message_id, action, action_status, resolution, action_timestamp, done_timestamp, hold_time, response_param, response_body) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setLong(1, messageId); + ps.setString(2, action.action.toString()); + ps.setString(3, action.actionStatus.toString()); + ps.setString(4, action.resolution); + ps.setTimestamp(5, new Timestamp(action.timestamp.getTime())); + if (action.doneTimestamp != null) { + ps.setTimestamp(6, new Timestamp(action.doneTimestamp.getTime())); + } else { + ps.setNull(6, Types.TIMESTAMP); + } + ps.setInt(7, action.holdTime); + if (action.returnResponse != null) { + ps.setString(8, JsonUtil.dataToJson(action.returnResponse.param)); + ps.setString(9, action.returnResponse.body); + } else { + ps.setNull(8, Types.VARCHAR); + ps.setNull(9, Types.VARCHAR); + } + ps.executeUpdate(); + } + con.commit(); + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error inserting message action to DB: " + e.getMessage(), e); + } + } + + @Override + public void updateActionDone(long actionId, Date now) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + String sql = "UPDATE message_action SET action_status = ?, done_timestamp = ? WHERE message_action_id = ?"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setString(1, ActionStatus.DONE.toString()); + ps.setTimestamp(2, new Timestamp(now.getTime())); + ps.setLong(3, actionId); + ps.executeUpdate(); + } + con.commit(); + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error updating action in DB: " + e.getMessage(), e); + } + } + + @SuppressWarnings("unchecked") + @Override + public List<Message> readMessageQueue(Queue queue) { + List<Message> messageList = new ArrayList<>(); + try (Connection con = dataSource.getConnection()) { + String msql = "SELECT * FROM message WHERE queue_type = ? AND queue_id = ?"; + String ssql = "SELECT * FROM message_status WHERE message_id = ? ORDER BY message_status_id DESC"; + String asql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY message_action_id DESC"; + try (PreparedStatement mps = con.prepareStatement(msql); PreparedStatement sps = con.prepareStatement(ssql); PreparedStatement aps = con.prepareStatement(asql)) { + mps.setString(1, queue.type); + mps.setString(2, queue.id); + try (ResultSet mrs = mps.executeQuery()) { + while (mrs.next()) { + Message m = new Message(); + m.messageId = mrs.getLong("message_id"); + m.extMessageId = mrs.getString("ext_message_id"); + m.request = new MessageData(); + m.request.param = (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("request_param")); + m.request.body = mrs.getString("request_body"); + m.response = new MessageData(); + m.response.param = (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("response_param")); + m.response.body = mrs.getString("response_body"); + m.queue = new Queue(); + m.queue.type = mrs.getString("queue_type"); + m.queue.id = mrs.getString("queue_id"); + m.arrivedTimestamp = mrs.getTimestamp("arrived_timestamp"); + m.startedTimestamp = mrs.getTimestamp("started_timestamp"); + m.completedTimestamp = mrs.getTimestamp("completed_timestamp"); + m.responseTimestamp = mrs.getTimestamp("response_timestamp"); + m.statusHistory = new ArrayList<>(); + m.actionHistory = new ArrayList<>(); + messageList.add(m); + + sps.setLong(1, m.messageId); + try (ResultSet srs = sps.executeQuery()) { + while (srs.next()) { + MessageStatus s = new MessageStatus(); + s.status = MessageStatusValue.valueOf(srs.getString("status")); + s.timestamp = srs.getTimestamp("status_timestamp"); + m.statusHistory.add(s); + } + } + + aps.setLong(1, m.messageId); + try (ResultSet ars = aps.executeQuery()) { + while (ars.next()) { + MessageAction a = new MessageAction(); + a.actionId = ars.getLong("message_action_id"); + a.action = MessageActionValue.valueOf(ars.getString("action")); + a.actionStatus = ActionStatus.valueOf(ars.getString("action_status")); + a.timestamp = ars.getTimestamp("action_timestamp"); + a.doneTimestamp = ars.getTimestamp("done_timestamp"); + a.holdTime = ars.getInt("hold_time"); + a.returnResponse = new MessageData(); + a.returnResponse.param = (Map<String, Object>) JsonUtil.jsonToData(ars.getString("response_param")); + a.returnResponse.body = ars.getString("response_body"); + if (a.returnResponse.param == null && a.returnResponse.body == null) { + a.returnResponse = null; + } + a.resolution = ars.getString("resolution"); + m.actionHistory.add(a); + } + } + } + } + } + } catch (SQLException e) { + throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e); + } + return messageList; + } + + @SuppressWarnings("unchecked") + @Override + public MessageAction getNextAction(long messageId) { + try (Connection con = dataSource.getConnection()) { + String sql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY action_timestamp DESC"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setLong(1, messageId); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + MessageAction a = new MessageAction(); + a.actionId = rs.getLong("message_action_id"); + a.action = MessageActionValue.valueOf(rs.getString("action")); + a.actionStatus = ActionStatus.valueOf(rs.getString("action_status")); + a.timestamp = rs.getTimestamp("action_timestamp"); + a.doneTimestamp = rs.getTimestamp("done_timestamp"); + a.holdTime = rs.getInt("hold_time"); + a.returnResponse = new MessageData(); + a.returnResponse.param = (Map<String, Object>) JsonUtil.jsonToData(rs.getString("response_param")); + a.returnResponse.body = rs.getString("response_body"); + if (a.returnResponse.param == null && a.returnResponse.body == null) { + a.returnResponse = null; + } + a.resolution = rs.getString("resolution"); + return a; + } + return null; + } + } + } catch (SQLException e) { + throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e); + } + } + + public void setDataSource(DataSource dataSource) { + this.dataSource = dataSource; + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java new file mode 100644 index 000000000..4aa016575 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java @@ -0,0 +1,5 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +public enum ActionStatus { + PENDING, DONE +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java new file mode 100644 index 000000000..9960eefbf --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java @@ -0,0 +1,5 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +public enum Event { + ARRIVED, COMPLETED, AWAKEN, CHECK +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java new file mode 100644 index 000000000..1f0b11384 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java @@ -0,0 +1,33 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +import java.util.Date; +import java.util.List; + +public class Message { + + public long messageId; + public String extMessageId; + public MessageData request; + public MessageData response; + public Date arrivedTimestamp; + public Date startedTimestamp; + public Date completedTimestamp; + public Date responseTimestamp; + public Queue queue; + + public List<MessageStatus> statusHistory; + public List<MessageAction> actionHistory; + + @Override + public String toString() { + StringBuilder ss = new StringBuilder(); + ss.append(messageId); + if (extMessageId != null) { + ss.append("::").append(extMessageId); + } + if (queue != null) { + ss.append("::").append(queue); + } + return ss.toString(); + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java new file mode 100644 index 000000000..319fc68ff --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java @@ -0,0 +1,59 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +import java.util.Date; + +public class MessageAction { + + public long actionId; + public MessageActionValue action; + public ActionStatus actionStatus; + public String resolution; + public Date timestamp; + public Date doneTimestamp; + public int holdTime; // in seconds + public MessageData returnResponse; + + @Override + public String toString() { + return action.toString() + " | " + actionStatus + " | " + resolution + " | " + holdTime + " | " + returnResponse; + } + + public boolean same(MessageAction a2) { + if (action != a2.action) { + return false; + } + if (action == MessageActionValue.HOLD || action == MessageActionValue.RETURN_HOLD) { + if (holdTime != a2.holdTime) { + return false; + } + } + if (action == MessageActionValue.RETURN_COMPLETE || action == MessageActionValue.RETURN_HOLD || action == MessageActionValue.RETURN_PROCESS) { + if (returnResponse == null != (a2.returnResponse == null)) { + return false; + } + if (returnResponse != null) { + if (returnResponse.param == null != (a2.returnResponse.param == null)) { + return false; + } + if (returnResponse.param != null && !returnResponse.param.equals(a2.returnResponse.param)) { + return false; + } + if (returnResponse.body == null != (a2.returnResponse.body == null)) { + return false; + } + if (returnResponse.body != null && !returnResponse.body.equals(a2.returnResponse.body)) { + return false; + } + } + } + if (action == MessageActionValue.RETURN_COMPLETE) { + if (resolution == null != (a2.resolution == null)) { + return false; + } + if (resolution != null && !resolution.equals(a2.resolution)) { + return false; + } + } + return true; + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java new file mode 100644 index 000000000..fd86d37c8 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java @@ -0,0 +1,5 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +public enum MessageActionValue { + HOLD, PROCESS, RETURN_COMPLETE, RETURN_PROCESS, RETURN_HOLD +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java new file mode 100644 index 000000000..1fa5f9a14 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java @@ -0,0 +1,21 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +import java.util.Map; + +public class MessageData { + + public Map<String, Object> param; + public String body; + + @Override + public String toString() { + StringBuilder ss = new StringBuilder(); + ss.append(param); + String b = body; + if (b != null && b.length() > 20) { + b = b.substring(0, 20) + "..."; + } + ss.append(b); + return ss.toString(); + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java new file mode 100644 index 000000000..e9af20be1 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java @@ -0,0 +1,9 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +import java.util.Date; + +public class MessageStatus { + + public MessageStatusValue status; + public Date timestamp; +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java new file mode 100644 index 000000000..af2bbf024 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java @@ -0,0 +1,5 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +public enum MessageStatusValue { + ARRIVED, IN_QUEUE, PROCESSING_SYNC, PROCESSING_ASYNC, COMPLETED +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java new file mode 100644 index 000000000..7fd83104a --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java @@ -0,0 +1,12 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +public class Queue { + + public String type; + public String id; + + @Override + public String toString() { + return type + "::" + id; + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java new file mode 100644 index 000000000..a81942771 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java @@ -0,0 +1,357 @@ +package org.onap.ccsdk.features.lib.doorman.impl; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.onap.ccsdk.features.lib.doorman.MessageQueueHandler; +import org.onap.ccsdk.features.lib.doorman.data.Event; +import org.onap.ccsdk.features.lib.doorman.data.Message; +import org.onap.ccsdk.features.lib.doorman.data.MessageAction; +import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue; +import org.onap.ccsdk.features.lib.doorman.data.MessageData; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatus; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatusValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageHandlerBaseImpl implements MessageQueueHandler { + + private static final Logger log = LoggerFactory.getLogger(MessageHandlerBaseImpl.class); + + private boolean async = false; + private int maxParallelCount = 1; + private int maxQueueSize = 0; + private int timeToLive = 0; // in seconds + private int maxTimeInQueue = 60; // in seconds + private int updateWaitTime = 60; // in seconds + + @Override + public Map<Long, MessageAction> nextAction(Event event, Message msg, List<Message> queue) { + long t1 = System.currentTimeMillis(); + log.info(">>>>> Handler started for message: " + msg + ": " + event); + + PrioritizedMessage pmsg = null; + List<PrioritizedMessage> processing = new ArrayList<>(); + List<PrioritizedMessage> waiting = new ArrayList<>(); + Date now = new Date(); + for (Message m : queue) { + PrioritizedMessage pm = new PrioritizedMessage(); + pm.message = m; + pm.priority = assignPriority(msg); + pm.timeInQueue = getTimeInQueue(now, m); + pm.updateGroup = determineUpdateGroup(m); + pm.updateSequence = determineUpdateSequence(m); + + if (pm.message.messageId == msg.messageId) { + pmsg = pm; + + if (event != Event.COMPLETED) { + waiting.add(pm); + } + } else { + MessageStatusValue s = m.statusHistory.get(0).status; + if (s == MessageStatusValue.IN_QUEUE) { + waiting.add(pm); + } else if (s == MessageStatusValue.PROCESSING_SYNC || s == MessageStatusValue.PROCESSING_ASYNC) { + processing.add(pm); + } + } + } + + log.info(" Processing:"); + for (PrioritizedMessage pm : processing) { + log.info(" -- " + pm.message + " | " + pm.priority + " | " + pm.timeInQueue + " | " + pm.updateGroup + " | " + pm.updateSequence); + } + log.info(" Waiting:"); + for (PrioritizedMessage pm : waiting) { + log.info(" -- " + pm.message + " | " + pm.priority + " | " + pm.timeInQueue + " | " + pm.updateGroup + " | " + pm.updateSequence); + } + + log.info(" Determined actions:"); + + Collections.sort(waiting, (pm1, pm2) -> comparePrioritizedMessages(pm1, pm2)); + + Map<Long, MessageAction> mm = new HashMap<>(); + + List<PrioritizedMessage> skipList = findMessagesToSkip(processing, waiting); + addNextActionComplete(mm, skipList, Resolution.SKIPPED, now); + + waiting.removeAll(skipList); + + List<PrioritizedMessage> expiredList = findExpiredMessages(processing, waiting); + addNextActionComplete(mm, expiredList, Resolution.EXPIRED, now); + + waiting.removeAll(expiredList); + + List<PrioritizedMessage> dropList = findMessagesToDrop(processing, waiting); + addNextActionComplete(mm, dropList, Resolution.DROPPED, now); + + waiting.removeAll(dropList); + + List<PrioritizedMessage> processList = findMessagesToProcess(processing, waiting); + for (PrioritizedMessage pm : processList) { + MessageAction a = new MessageAction(); + a.timestamp = now; + if (async) { + a.action = MessageActionValue.RETURN_PROCESS; + a.returnResponse = ackResponse(pm.message); + } else { + a.action = MessageActionValue.PROCESS; + } + mm.put(pm.message.messageId, a); + log.info(" -- Next action for message: " + pm.message + ": " + a.action); + } + + if (event != Event.COMPLETED && !mm.containsKey(pmsg.message.messageId)) { + MessageAction a = new MessageAction(); + a.timestamp = now; + a.holdTime = pmsg.updateGroup != null ? updateWaitTime : maxTimeInQueue; + if (async) { + a.action = MessageActionValue.RETURN_HOLD; + a.returnResponse = ackResponse(pmsg.message); + } else { + a.action = MessageActionValue.HOLD; + } + mm.put(pmsg.message.messageId, a); + log.info(" -- Next action for message: " + pmsg.message + ": " + a.action + ": " + a.holdTime); + + waiting.remove(pmsg); + } + + if (event == Event.COMPLETED) { + MessageAction a = new MessageAction(); + a.timestamp = now; + a.action = MessageActionValue.RETURN_COMPLETE; + a.resolution = Resolution.PROCESSED.toString(); + a.returnResponse = completeResponse(Resolution.PROCESSED, msg); + mm.put(pmsg.message.messageId, a); + log.info(" -- Next action for message: " + pmsg.message + ": " + a.action + ": " + a.resolution); + } + + long t2 = System.currentTimeMillis(); + log.info("<<<<< Handler completed for message: " + msg + ": " + event + ": Time: " + (t2 - t1)); + return mm; + } + + private void addNextActionComplete(Map<Long, MessageAction> mm, List<PrioritizedMessage> skipList, Resolution r, Date now) { + for (PrioritizedMessage pm : skipList) { + MessageAction a = new MessageAction(); + a.action = MessageActionValue.RETURN_COMPLETE; + a.resolution = r.toString(); + a.timestamp = now; + a.returnResponse = completeResponse(r, pm.message); + mm.put(pm.message.messageId, a); + log.info(" -- Next action for message: " + pm.message + ": " + a.action + ": " + a.resolution); + } + } + + protected List<PrioritizedMessage> findMessagesToSkip(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) { + List<PrioritizedMessage> ll = new ArrayList<>(); + Map<String, PrioritizedMessage> lastMap = new HashMap<>(); + for (PrioritizedMessage pm : waiting) { + if (pm.updateGroup != null) { + PrioritizedMessage last = lastMap.get(pm.updateGroup); + if (last == null) { + lastMap.put(pm.updateGroup, pm); + } else { + if (pm.updateSequence > last.updateSequence) { + ll.add(last); + lastMap.put(pm.updateGroup, pm); + } + } + } + } + return ll; + } + + protected List<PrioritizedMessage> findExpiredMessages(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) { + List<PrioritizedMessage> ll = new ArrayList<>(); + if (timeToLive > 0) { + for (PrioritizedMessage pm : waiting) { + if (pm.timeInQueue > timeToLive) { + ll.add(pm); + } + } + } + return ll; + } + + protected List<PrioritizedMessage> findMessagesToDrop(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) { + List<PrioritizedMessage> ll = new ArrayList<>(); + if (maxQueueSize > 0) { + // Drop the least important messages (last in the prioritized waiting list) + for (int i = maxQueueSize; i < waiting.size(); i++) { + ll.add(waiting.get(i)); + } + } + return ll; + } + + protected List<PrioritizedMessage> findMessagesToProcess(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) { + List<PrioritizedMessage> ll = new ArrayList<>(); + + if (processing.size() >= maxParallelCount) { + return ll; + } + + // Find messages allowed to be processed based on the concurrency rules + + List<List<String>> currentLocks = new ArrayList<>(); + for (PrioritizedMessage m : processing) { + List<List<String>> locks = determineConcurency(m.message); + if (locks != null) { + currentLocks.addAll(locks); + } + } + + List<PrioritizedMessage> allowed = new ArrayList<>(); + for (PrioritizedMessage m : waiting) { + List<List<String>> neededLocks = determineConcurency(m.message); + if (allowed(currentLocks, neededLocks)) { + allowed.add(m); + } + } + + // Remove messages that are waiting on hold + Iterator<PrioritizedMessage> ii = allowed.iterator(); + while (ii.hasNext()) { + PrioritizedMessage pm = ii.next(); + if (pm.updateGroup != null) { + log.info(" --- Check: " + pm.message + ": " + pm.timeInQueue + " >= " + updateWaitTime); + if (pm.timeInQueue < updateWaitTime) { + ii.remove(); + } + } + } + + // Limit the number of processing messages to maxParallelCount + + for (int i = 0; i < allowed.size() && i < maxParallelCount - processing.size(); i++) { + ll.add(allowed.get(i)); + } + + return ll; + } + + private int comparePrioritizedMessages(PrioritizedMessage pm1, PrioritizedMessage pm2) { + if (pm1.timeInQueue >= maxTimeInQueue && pm2.timeInQueue < maxTimeInQueue) { + return -1; + } + if (pm1.timeInQueue < maxTimeInQueue && pm2.timeInQueue >= maxTimeInQueue) { + return 1; + } + if (pm1.priority < pm2.priority) { + return -1; + } + if (pm1.priority > pm2.priority) { + return 1; + } + if (pm1.timeInQueue > pm2.timeInQueue) { + return -1; + } + if (pm1.timeInQueue < pm2.timeInQueue) { + return 1; + } + return 0; + } + + private int getTimeInQueue(Date now, Message m) { + // Find the elapsed time since message arrived. That will be the timestamp of the first + // status of the message (last status in the status history list) + MessageStatus receivedStatus = m.statusHistory.get(m.statusHistory.size() - 1); + return (int) ((now.getTime() - receivedStatus.timestamp.getTime()) / 1000); + } + + private boolean allowed(List<List<String>> currentLocks, List<List<String>> neededLocks) { + if (neededLocks == null) { + return true; + } + for (List<String> neededLockLevels : neededLocks) { + for (List<String> currentLockLevels : currentLocks) { + int n = neededLockLevels.size() < currentLockLevels.size() ? neededLockLevels.size() : currentLockLevels.size(); + boolean good = false; + for (int i = 0; i < n; i++) { + if (!neededLockLevels.get(i).equals(currentLockLevels.get(i))) { + good = true; + break; + } + } + if (!good) { + return false; + } + } + } + return true; + } + + protected void initMessage(Message msg) { + } + + protected void closeMessage(Message msg) { + } + + protected long assignPriority(Message msg) { + return msg.messageId; // FIFO by default + } + + protected String determineUpdateGroup(Message msg) { + return null; + } + + protected long determineUpdateSequence(Message msg) { + return msg.messageId; + } + + protected List<List<String>> determineConcurency(Message msg) { + return null; + } + + protected MessageData ackResponse(Message msg) { + return null; + } + + protected MessageData completeResponse(Resolution r, Message msg) { + return null; + } + + protected static enum Resolution { + PROCESSED, SKIPPED, EXPIRED, DROPPED + } + + protected static class PrioritizedMessage { + + public Message message; + public long priority; + public int timeInQueue; + public String updateGroup; + public long updateSequence; + } + + public void setMaxParallelCount(int maxParallelCount) { + this.maxParallelCount = maxParallelCount; + } + + public void setTimeToLive(int timeToLive) { + this.timeToLive = timeToLive; + } + + public void setMaxTimeInQueue(int maxTimeInQueue) { + this.maxTimeInQueue = maxTimeInQueue; + } + + public void setUpdateWaitTime(int updateWaitTime) { + this.updateWaitTime = updateWaitTime; + } + + public void setMaxQueueSize(int maxQueueSize) { + this.maxQueueSize = maxQueueSize; + } + + public void setAsync(boolean async) { + this.async = async; + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java new file mode 100644 index 000000000..61059e067 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java @@ -0,0 +1,57 @@ +package org.onap.ccsdk.features.lib.doorman.impl; + +import java.util.Map; +import org.onap.ccsdk.features.lib.doorman.MessageClassifier; +import org.onap.ccsdk.features.lib.doorman.MessageInterceptor; +import org.onap.ccsdk.features.lib.doorman.MessageInterceptorFactory; +import org.onap.ccsdk.features.lib.doorman.MessageProcessor; +import org.onap.ccsdk.features.lib.doorman.MessageQueueHandler; +import org.onap.ccsdk.features.lib.doorman.dao.MessageDao; +import org.onap.ccsdk.features.lib.rlock.LockHelper; + +public class MessageInterceptorFactoryImpl implements MessageInterceptorFactory { + + private MessageClassifier messageClassifier; + private Map<String, MessageQueueHandler> handlerMap; + private MessageProcessor messageProcessor; + private MessageDao messageDao; + + private LockHelper lockHelper; + private int lockTimeout; // in seconds + + @Override + public MessageInterceptor create() { + MessageInterceptorImpl mi = new MessageInterceptorImpl(); + mi.setMessageClassifier(messageClassifier); + mi.setHandlerMap(handlerMap); + mi.setMessageProcessor(messageProcessor); + mi.setMessageDao(messageDao); + mi.setLockHelper(lockHelper); + mi.setLockTimeout(lockTimeout); + return mi; + } + + public void setMessageDao(MessageDao messageDao) { + this.messageDao = messageDao; + } + + public void setLockHelper(LockHelper lockHelper) { + this.lockHelper = lockHelper; + } + + public void setLockTimeout(int lockTimeout) { + this.lockTimeout = lockTimeout; + } + + public void setMessageClassifier(MessageClassifier messageClassifier) { + this.messageClassifier = messageClassifier; + } + + public void setHandlerMap(Map<String, MessageQueueHandler> handlerMap) { + this.handlerMap = handlerMap; + } + + public void setMessageProcessor(MessageProcessor messageProcessor) { + this.messageProcessor = messageProcessor; + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java new file mode 100644 index 000000000..d2ab97101 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java @@ -0,0 +1,336 @@ +package org.onap.ccsdk.features.lib.doorman.impl; + +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Map; +import org.onap.ccsdk.features.lib.doorman.MessageClassifier; +import org.onap.ccsdk.features.lib.doorman.MessageInterceptor; +import org.onap.ccsdk.features.lib.doorman.MessageProcessor; +import org.onap.ccsdk.features.lib.doorman.MessageQueueHandler; +import org.onap.ccsdk.features.lib.doorman.dao.MessageDao; +import org.onap.ccsdk.features.lib.doorman.data.ActionStatus; +import org.onap.ccsdk.features.lib.doorman.data.Event; +import org.onap.ccsdk.features.lib.doorman.data.Message; +import org.onap.ccsdk.features.lib.doorman.data.MessageAction; +import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue; +import org.onap.ccsdk.features.lib.doorman.data.MessageData; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatus; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatusValue; +import org.onap.ccsdk.features.lib.doorman.data.Queue; +import org.onap.ccsdk.features.lib.rlock.LockHelper; +import org.onap.ccsdk.features.lib.rlock.SynchronizedFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageInterceptorImpl implements MessageInterceptor { + + private static final Logger log = LoggerFactory.getLogger(MessageInterceptorImpl.class); + + private MessageClassifier messageClassifier; + private Map<String, MessageQueueHandler> handlerMap; + private MessageProcessor messageProcessor; + private MessageDao messageDao; + + private LockHelper lockHelper; + private int lockTimeout = 10; // in seconds + + private Message message = null; + private MessageQueueHandler handler = null; + + @Override + public MessageData processRequest(MessageData request) { + Date now = new Date(); + + Queue q = null; + String extMessageId = null; + if (messageClassifier != null) { + q = messageClassifier.determineQueue(request); + extMessageId = messageClassifier.getExtMessageId(request); + } + + long id = messageDao.addArrivedMessage(extMessageId, request, q, now); + + MessageStatus arrivedStatus = new MessageStatus(); + arrivedStatus.status = MessageStatusValue.ARRIVED; + arrivedStatus.timestamp = now; + messageDao.addStatus(id, arrivedStatus); + + message = new Message(); + message.messageId = id; + message.request = request; + message.arrivedTimestamp = now; + message.queue = q; + message.extMessageId = extMessageId; + + log.info("Message received: " + message); + + if (q != null && handlerMap != null) { + handler = handlerMap.get(q.type); + } + + if (q == null || handler == null) { + processSync(); + return null; // Do nothing, normal message processing + } + + Event event = Event.ARRIVED; + + while (true) { + MessageFunction func = new MessageFunction(event); + func.exec(); + + MessageAction nextAction = func.getNextAction(); + if (nextAction == null) { + processSync(); + return null; + } + + switch (nextAction.action) { + + case PROCESS: + processSync(); + return null; + + case HOLD: { + event = waitForNewAction(nextAction.holdTime); + break; + } + + case RETURN_COMPLETE: + returnComplete(nextAction); + return nextAction.returnResponse; + + case RETURN_PROCESS: + processAsync(nextAction.returnResponse); + return nextAction.returnResponse; + + case RETURN_HOLD: { + returnHold(nextAction); + return nextAction.returnResponse; + } + } + } + } + + private void processSync() { + messageDao.updateMessageStarted(message.messageId, new Date()); + log.info("Message processing started: " + message); + } + + private void processAsync(MessageData returnResponse) { + Thread t = new Thread(() -> processMessage(message.request), message.queue.type + "::" + message.queue.id + "::" + message.messageId); + t.start(); + + messageDao.updateMessageResponse(message.messageId, new Date(), returnResponse); + } + + private void processMessage(MessageData request) { + messageDao.updateMessageStarted(message.messageId, new Date()); + log.info("Message processing started: " + message); + if (messageProcessor != null) { + messageProcessor.processMessage(request); + } + + MessageFunction func = new MessageFunction(Event.COMPLETED); + func.exec(); + MessageAction nextAction = func.getNextAction(); + + messageDao.updateMessageCompleted(message.messageId, nextAction.resolution, new Date()); + log.info("Message processing completed: " + message); + } + + private void returnComplete(MessageAction nextAction) { + Date now = new Date(); + messageDao.updateMessageResponse(message.messageId, now, nextAction.returnResponse); + messageDao.updateMessageCompleted(message.messageId, nextAction.resolution, now); + log.info("Message processing completed: " + message); + } + + private void returnHold(MessageAction nextAction) { + Thread t = new Thread(() -> asyncQueue(nextAction), message.queue.type + "::" + message.queue.id + "::" + message.messageId); + t.start(); + messageDao.updateMessageResponse(message.messageId, new Date(), nextAction.returnResponse); + } + + private void asyncQueue(MessageAction nextAction) { + Event event = waitForNewAction(nextAction.holdTime); + + while (true) { + MessageFunction func = new MessageFunction(event); + func.exec(); + + nextAction = func.getNextAction(); + if (nextAction == null) { + processMessage(message.request); + return; + } + + switch (nextAction.action) { + case PROCESS: + processMessage(message.request); + return; + + case HOLD: { + event = waitForNewAction(nextAction.holdTime); + break; + } + + default: + return; + } + } + } + + private Event waitForNewAction(int holdTime) { + long startTime = System.currentTimeMillis(); + long currentTime = startTime; + while (currentTime - startTime <= (holdTime + 1) * 1000) { + try { + Thread.sleep(5000); + } catch (Exception e) { + } + + MessageAction nextAction = messageDao.getNextAction(message.messageId); + if (nextAction != null && nextAction.action != MessageActionValue.HOLD) { + return Event.AWAKEN; + } + + currentTime = System.currentTimeMillis(); + } + return Event.CHECK; + } + + @Override + public void processResponse(MessageData response) { + if (message == null) { + return; + } + + String resolution = null; + if (message.queue != null && handler != null) { + MessageFunction func = new MessageFunction(Event.COMPLETED); + func.exec(); + MessageAction nextAction = func.getNextAction(); + if (nextAction != null) { + resolution = nextAction.resolution; + } + } + + Date now = new Date(); + messageDao.updateMessageResponse(message.messageId, now, response); + messageDao.updateMessageCompleted(message.messageId, resolution, now); + log.info("Message processing completed: " + message); + } + + private class MessageFunction extends SynchronizedFunction { + + private Event event; + + private MessageAction nextAction = null; // Output + + public MessageFunction(Event event) { + super(lockHelper, Collections.singleton(message.queue.type + "::" + message.queue.id), lockTimeout); + this.event = event; + } + + @Override + protected void _exec() { + List<Message> messageQueue = messageDao.readMessageQueue(message.queue); + if (event == Event.AWAKEN) { + for (Message m : messageQueue) { + if (m.messageId == message.messageId) { + nextAction = m.actionHistory.get(0); + } + } + if (nextAction != null) { + messageDao.updateActionDone(nextAction.actionId, new Date()); + } + } else { + Map<Long, MessageAction> nextActionMap = handler.nextAction(event, message, messageQueue); + if (nextActionMap != null) { + for (Message m : messageQueue) { + MessageAction action = nextActionMap.get(m.messageId); + if (action != null) { + if (m.messageId == message.messageId) { + action.actionStatus = ActionStatus.DONE; + action.doneTimestamp = new Date(); + messageDao.addAction(m.messageId, action); + nextAction = action; + } else { + MessageAction lastAction = m.actionHistory.get(0); + if (lastAction.actionStatus != ActionStatus.PENDING || !action.same(lastAction)) { + action.actionStatus = ActionStatus.PENDING; + messageDao.addAction(m.messageId, action); + } + } + } + } + } + } + if (nextAction != null) { + log.info("Next message action: " + message + ":" + nextAction.action); + MessageStatus status = determineStatus(nextAction); + if (status != null) { + messageDao.addStatus(message.messageId, status); + log.info("Updating message status: " + message + ":" + status.status); + } + } + } + + public MessageAction getNextAction() { + return nextAction; + } + } + + private MessageStatus determineStatus(MessageAction action) { + if (action == null) { + return null; + } + + MessageStatus s = new MessageStatus(); + s.timestamp = new Date(); + + switch (action.action) { + case PROCESS: + s.status = MessageStatusValue.PROCESSING_SYNC; + break; + case HOLD: + case RETURN_HOLD: + s.status = MessageStatusValue.IN_QUEUE; + break; + case RETURN_PROCESS: + s.status = MessageStatusValue.PROCESSING_ASYNC; + break; + case RETURN_COMPLETE: + s.status = MessageStatusValue.COMPLETED; + break; + } + + return s; + } + + public void setMessageDao(MessageDao messageDao) { + this.messageDao = messageDao; + } + + public void setLockHelper(LockHelper lockHelper) { + this.lockHelper = lockHelper; + } + + public void setLockTimeout(int lockTimeout) { + this.lockTimeout = lockTimeout; + } + + public void setMessageClassifier(MessageClassifier messageClassifier) { + this.messageClassifier = messageClassifier; + } + + public void setHandlerMap(Map<String, MessageQueueHandler> handlerMap) { + this.handlerMap = handlerMap; + } + + public void setMessageProcessor(MessageProcessor messageProcessor) { + this.messageProcessor = messageProcessor; + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java new file mode 100644 index 000000000..73c3d8b6a --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java @@ -0,0 +1,236 @@ +package org.onap.ccsdk.features.lib.doorman.servlet; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.CharArrayWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletInputStream; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletRequestWrapper; +import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.HttpServletResponseWrapper; +import org.onap.ccsdk.features.lib.doorman.MessageInterceptor; +import org.onap.ccsdk.features.lib.doorman.MessageInterceptorFactory; +import org.onap.ccsdk.features.lib.doorman.data.MessageData; + +public class MessageInterceptorFilter implements Filter { + + private MessageInterceptorFactory messageInterceptorFactory; + + @Override + public void init(FilterConfig filterConfig) throws ServletException { + } + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { + RequestWrapper req = new RequestWrapper((HttpServletRequest) request); + MessageData requestData = getMessageRequest(req); + + MessageInterceptor interceptor = messageInterceptorFactory.create(); + MessageData responseData = interceptor.processRequest(requestData); + + if (responseData == null) { + ResponseWrapper res = new ResponseWrapper((HttpServletResponse) response); + + chain.doFilter(req, res); + + responseData = res.getMessageResponse(); + interceptor.processResponse(responseData); + } + + setMessageResponse((HttpServletResponse) response, responseData); + } + + @SuppressWarnings("unchecked") + private void setMessageResponse(HttpServletResponse response, MessageData responseData) throws IOException { + if (responseData.param != null) { + String contentType = (String) responseData.param.get("content_type"); + if (contentType != null) { + response.setContentType(contentType); + } + Integer httpCode = (Integer) responseData.param.get("http_code"); + if (httpCode != null) { + response.setStatus(httpCode); + } + Map<String, Object> headers = (Map<String, Object>) responseData.param.get("headers"); + if (headers != null) { + for (Entry<String, Object> entry : headers.entrySet()) { + String name = entry.getKey(); + Object v = entry.getValue(); + if (v instanceof List) { + List<Object> ll = (List<Object>) v; + for (Object o : ll) { + response.addHeader(name, o.toString()); + } + } else { + response.setHeader(name, v.toString()); + } + } + } + } + + if (responseData.body != null) { + response.setContentLength(responseData.body.length()); + response.getWriter().write(responseData.body); + } + } + + @SuppressWarnings("unchecked") + private MessageData getMessageRequest(RequestWrapper request) { + MessageData requestData = new MessageData(); + requestData.param = new HashMap<>(); + requestData.param.put("http_method", request.getMethod()); + requestData.param.put("uri", request.getPathInfo()); + requestData.param.put("param", request.getParameterMap()); + requestData.param.put("content_type", request.getContentType()); + + Map<String, Object> headers = new HashMap<>(); + Enumeration<String> headerNames = request.getHeaderNames(); + while (headerNames.hasMoreElements()) { + String name = headerNames.nextElement(); + Enumeration<String> values = request.getHeaders(name); + List<String> valueList = new ArrayList<>(); + while (values.hasMoreElements()) { + valueList.add(values.nextElement()); + } + if (valueList.size() > 1) { + headers.put(name, valueList); + } else { + headers.put(name, valueList.get(0)); + } + } + requestData.param.put("headers", headers); + + requestData.body = request.getBody(); + + return requestData; + } + + @Override + public void destroy() { + } + + private static class RequestWrapper extends HttpServletRequestWrapper { + + private final String body; + + public RequestWrapper(HttpServletRequest request) throws IOException { + super(request); + + StringBuilder stringBuilder = new StringBuilder(); + try (InputStream inputStream = request.getInputStream()) { + if (inputStream != null) { + try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) { + char[] charBuffer = new char[128]; + int bytesRead = -1; + while ((bytesRead = bufferedReader.read(charBuffer)) > 0) { + stringBuilder.append(charBuffer, 0, bytesRead); + } + } + } + } + body = stringBuilder.toString(); + } + + @Override + public ServletInputStream getInputStream() throws IOException { + final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body.getBytes()); + ServletInputStream servletInputStream = new ServletInputStream() { + + @Override + public int read() throws IOException { + return byteArrayInputStream.read(); + } + }; + return servletInputStream; + } + + @Override + public BufferedReader getReader() throws IOException { + return new BufferedReader(new InputStreamReader(getInputStream())); + } + + public String getBody() { + return body; + } + } + + public class ResponseWrapper extends HttpServletResponseWrapper { + + private CharArrayWriter writer = new CharArrayWriter(); + private Map<String, Object> param = new HashMap<>(); + + public ResponseWrapper(HttpServletResponse response) { + super(response); + } + + @Override + public PrintWriter getWriter() { + return new PrintWriter(writer); + } + + @Override + public void setStatus(int status) { + param.put("http_code", status); + } + + @SuppressWarnings("unchecked") + @Override + public void setHeader(String name, String value) { + Map<String, Object> headers = (Map<String, Object>) param.get("headers"); + if (headers == null) { + headers = new HashMap<>(); + param.put("headers", headers); + } + headers.put(name, value); + } + + @SuppressWarnings("unchecked") + @Override + public void addHeader(String name, String value) { + Map<String, Object> headers = (Map<String, Object>) param.get("headers"); + if (headers == null) { + headers = new HashMap<>(); + param.put("headers", headers); + } + Object v = headers.get(name); + if (v == null) { + headers.put(name, value); + } else if (v instanceof List) { + ((List<Object>) v).add(value); + } else { + List<Object> ll = new ArrayList<>(); + ll.add(v); + ll.add(value); + headers.put(name, ll); + } + } + + public MessageData getMessageResponse() { + MessageData responseData = new MessageData(); + responseData.param = param; + responseData.body = writer.toString(); + return responseData; + } + } + + public void setMessageInterceptorFactory(MessageInterceptorFactory messageInterceptorFactory) { + this.messageInterceptorFactory = messageInterceptorFactory; + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java new file mode 100644 index 000000000..42f1b21e7 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java @@ -0,0 +1,273 @@ +package org.onap.ccsdk.features.lib.doorman.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DataUtil { + + @SuppressWarnings("unused") + private static final Logger log = LoggerFactory.getLogger(DataUtil.class); + + @SuppressWarnings("unchecked") + public static Object get(Object struct, String compositeKey) { + if (struct == null || compositeKey == null) { + return null; + } + + List<Object> keys = splitCompositeKey(compositeKey); + Object currentValue = struct; + String currentKey = ""; + + for (Object key : keys) { + if (key instanceof Integer) { + if (!(currentValue instanceof List)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list"); + } + + Integer keyi = (Integer) key; + List<Object> currentValueL = (List<Object>) currentValue; + + if (keyi >= currentValueL.size()) { + return null; + } + + currentValue = currentValueL.get(keyi); + if (currentValue == null) { + return null; + } + + currentKey += "[" + key + "]"; + } else { + if (!(currentValue instanceof Map)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map"); + } + + currentValue = ((Map<String, Object>) currentValue).get(key); + if (currentValue == null) { + return null; + } + + currentKey += "." + key; + } + } + return currentValue; + } + + @SuppressWarnings("unchecked") + public static void set(Object struct, String compositeKey, Object value) { + if (struct == null) { + throw new IllegalArgumentException("Null argument: struct"); + } + + if (compositeKey == null || compositeKey.length() == 0) { + throw new IllegalArgumentException("Null or empty argument: compositeKey"); + } + + if (value == null) { + return; + } + + List<Object> keys = splitCompositeKey(compositeKey); + Object currentValue = struct; + String currentKey = ""; + + for (int i = 0; i < keys.size() - 1; i++) { + Object key = keys.get(i); + + if (key instanceof Integer) { + if (!(currentValue instanceof List)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list"); + } + + Integer keyi = (Integer) key; + List<Object> currentValueL = (List<Object>) currentValue; + int size = currentValueL.size(); + + if (keyi >= size) { + for (int k = 0; k < keyi - size + 1; k++) { + currentValueL.add(null); + } + } + + Object newValue = currentValueL.get(keyi); + if (newValue == null) { + Object nextKey = keys.get(i + 1); + if (nextKey instanceof Integer) { + newValue = new ArrayList<>(); + } else { + newValue = new HashMap<>(); + } + currentValueL.set(keyi, newValue); + } + + currentValue = newValue; + currentKey += "[" + key + "]"; + + } else { + if (!(currentValue instanceof Map)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map"); + } + + Object newValue = ((Map<String, Object>) currentValue).get(key); + if (newValue == null) { + Object nextKey = keys.get(i + 1); + if (nextKey instanceof Integer) { + newValue = new ArrayList<>(); + } else { + newValue = new HashMap<>(); + } + ((Map<String, Object>) currentValue).put((String) key, newValue); + } + + currentValue = newValue; + currentKey += "." + key; + } + } + + Object key = keys.get(keys.size() - 1); + if (key instanceof Integer) { + if (!(currentValue instanceof List)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list"); + } + + Integer keyi = (Integer) key; + List<Object> currentValueL = (List<Object>) currentValue; + int size = currentValueL.size(); + + if (keyi >= size) { + for (int k = 0; k < keyi - size + 1; k++) { + currentValueL.add(null); + } + } + + currentValueL.set(keyi, value); + + } else { + if (!(currentValue instanceof Map)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map"); + } + + ((Map<String, Object>) currentValue).put((String) key, value); + } + } + + @SuppressWarnings("unchecked") + public static void delete(Object struct, String compositeKey) { + if (struct == null) { + throw new IllegalArgumentException("Null argument: struct"); + } + + if (compositeKey == null) { + throw new IllegalArgumentException("Null argument: compositeKey"); + } + + List<Object> keys = splitCompositeKey(compositeKey); + Object currentValue = struct; + String currentKey = ""; + + for (int i = 0; i < keys.size() - 1; i++) { + Object key = keys.get(i); + + if (key instanceof Integer) { + if (!(currentValue instanceof List)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list"); + } + + Integer keyi = (Integer) key; + List<Object> currentValueL = (List<Object>) currentValue; + + if (keyi >= currentValueL.size()) { + return; + } + + currentValue = currentValueL.get(keyi); + if (currentValue == null) { + return; + } + + currentKey += "[" + key + "]"; + + } else { + if (!(currentValue instanceof Map)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map"); + } + + currentValue = ((Map<String, Object>) currentValue).get(key); + if (currentValue == null) { + return; + } + + currentKey += "." + key; + } + } + + Object key = keys.get(keys.size() - 1); + if (key instanceof Integer) { + if (!(currentValue instanceof List)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list"); + } + + Integer keyi = (Integer) key; + List<Object> currentValueL = (List<Object>) currentValue; + + if (keyi < currentValueL.size()) { + currentValueL.remove(keyi); + } + + } else { + if (!(currentValue instanceof Map)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map"); + } + + ((Map<String, Object>) currentValue).remove(key); + } + } + + private static List<Object> splitCompositeKey(String compositeKey) { + if (compositeKey == null) { + return Collections.emptyList(); + } + + String[] ss = compositeKey.split("\\."); + List<Object> ll = new ArrayList<>(); + for (String s : ss) { + if (s.length() == 0) { + continue; + } + + int i1 = s.indexOf('['); + if (i1 < 0) { + ll.add(s); + } else { + if (!s.endsWith("]")) { + throw new IllegalArgumentException("Invalid composite key: " + compositeKey + ": No matching ] found"); + } + + String s1 = s.substring(0, i1); + if (s1.length() > 0) { + ll.add(s1); + } + + String s2 = s.substring(i1 + 1, s.length() - 1); + try { + int n = Integer.parseInt(s2); + if (n < 0) { + throw new IllegalArgumentException("Invalid composite key: " + compositeKey + ": Index must be >= 0: " + n); + } + + ll.add(n); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid composite key: " + compositeKey + ": Index not a number: " + s2); + } + } + } + + return ll; + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java new file mode 100644 index 000000000..fc9ce0936 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java @@ -0,0 +1,319 @@ +package org.onap.ccsdk.features.lib.doorman.util; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class JsonUtil { + + public static Object jsonToData(String s) { + if (s == null) { + return null; + } + return jsonToData(new Current(s)); + } + + private static class Current { + + public String s; + public int i; + public int line, pos; + + public Current(String s) { + this.s = s; + i = 0; + line = 1; + pos = 1; + } + + public void move() { + i++; + pos++; + } + + public void move(int k) { + i += k; + pos += k; + } + + public boolean end() { + return i >= s.length(); + } + + public char get() { + return s.charAt(i); + } + + public boolean is(String ss) { + return i < s.length() - ss.length() && s.substring(i, i + ss.length()).equals(ss); + } + + public boolean is(char c) { + return i < s.length() && s.charAt(i) == c; + } + + public void skipWhiteSpace() { + while (i < s.length() && s.charAt(i) <= 32) { + char cc = s.charAt(i); + if (cc == '\n') { + line++; + pos = 1; + } else if (cc == ' ' || cc == '\t') { + pos++; + } + i++; + } + } + } + + public static Object jsonToData(Current c) { + c.skipWhiteSpace(); + + if (c.end()) { + return ""; + } + + char cc = c.get(); + if (cc == '{') { + c.move(); + return jsonToMap(c); + } + if (cc == '[') { + c.move(); + return jsonToList(c); + } + return jsonToObject(c); + } + + private static Object jsonToObject(Current c) { + if (c.is('"')) { + c.move(); + StringBuilder ss = new StringBuilder(); + while (!c.end() && c.get() != '"') { + if (c.get() == '\\') { + c.move(); + char cc = c.get(); + switch (cc) { + case '\\': + ss.append('\\'); + break; + case '"': + ss.append('\"'); + break; + case 'n': + ss.append('\n'); + break; + case 'r': + ss.append('\r'); + break; + case 't': + ss.append('\t'); + break; + default: + throw new IllegalArgumentException("JSON parsing error: Invalid escaped character at (" + c.line + ':' + c.pos + "): " + cc); + } + } else { + ss.append(c.get()); + } + c.move(); + } + if (!c.end()) { + c.move(); // Skip the closing " + } + return ss.toString(); + } + if (c.is("true")) { + c.move(4); + return true; + } + if (c.is("false")) { + c.move(5); + return false; + } + if (c.is("null")) { + c.move(4); + return null; + } + StringBuilder ss = new StringBuilder(); + while (!c.end() && c.get() != ',' && c.get() != ']' && c.get() != '}' && c.get() != '\n' && c.get() != '\t') { + ss.append(c.get()); + c.move(); + } + try { + return Long.valueOf(ss.toString()); + } catch (Exception e1) { + try { + return Double.valueOf(ss.toString()); + } catch (Exception e2) { + throw new IllegalArgumentException("JSON parsing error: Invalid value at (" + c.line + ':' + c.pos + "): " + ss.toString()); + } + } + } + + private static List<Object> jsonToList(Current c) { + List<Object> ll = new ArrayList<>(); + c.skipWhiteSpace(); + while (!c.end() && c.get() != ']') { + Object value = jsonToData(c); + + ll.add(value); + + c.skipWhiteSpace(); + if (c.is(',')) { + c.move(); + c.skipWhiteSpace(); + } + } + + if (!c.end()) { + c.move(); // Skip the closing ] + } + + return ll; + } + + private static Map<String, Object> jsonToMap(Current c) { + Map<String, Object> mm = new HashMap<>(); + c.skipWhiteSpace(); + while (!c.end() && c.get() != '}') { + Object key = jsonToObject(c); + if (!(key instanceof String)) { + throw new IllegalArgumentException("JSON parsing error: Invalid key at (" + c.line + ':' + c.pos + "): " + key); + } + + c.skipWhiteSpace(); + if (!c.is(':')) { + throw new IllegalArgumentException("JSON parsing error: Expected ':' at (" + c.line + ':' + c.pos + ")"); + } + + c.move(); // Skip the : + Object value = jsonToData(c); + + mm.put((String) key, value); + + c.skipWhiteSpace(); + if (c.is(',')) { + c.move(); + c.skipWhiteSpace(); + } + } + + if (!c.end()) { + c.move(); // Skip the closing } + } + + return mm; + } + + public static String dataToJson(Object o, boolean escape) { + StringBuilder sb = new StringBuilder(); + str(sb, o, 0, false, escape); + return sb.toString(); + } + + public static String dataToJson(Object o) { + return dataToJson(o, true); + } + + @SuppressWarnings("unchecked") + private static void str(StringBuilder ss, Object o, int indent, boolean padFirst, boolean escape) { + if (o == null || o instanceof Boolean || o instanceof Number) { + if (padFirst) { + ss.append(pad(indent)); + } + ss.append(o); + return; + } + + if (o instanceof String) { + String s = escape ? escapeJson((String) o) : (String) o; + if (padFirst) { + ss.append(pad(indent)); + } + ss.append('"').append(s).append('"'); + return; + } + + if (o instanceof Number || o instanceof Boolean) { + if (padFirst) { + ss.append(pad(indent)); + } + ss.append(o); + return; + } + + if (o instanceof Map) { + Map<String, Object> mm = (Map<String, Object>) o; + + if (padFirst) { + ss.append(pad(indent)); + } + ss.append("{\n"); + + boolean first = true; + for (String k : mm.keySet()) { + if (!first) { + ss.append(",\n"); + } + first = false; + + Object v = mm.get(k); + ss.append(pad(indent + 1)); + if (escape) { + ss.append('"'); + } + ss.append(k); + if (escape) { + ss.append('"'); + } + ss.append(": "); + str(ss, v, indent + 1, false, escape); + } + + ss.append("\n"); + ss.append(pad(indent)).append('}'); + + return; + } + + if (o instanceof List) { + List<Object> ll = (List<Object>) o; + + if (padFirst) { + ss.append(pad(indent)); + } + ss.append("[\n"); + + boolean first = true; + for (Object o1 : ll) { + if (!first) { + ss.append(",\n"); + } + first = false; + + str(ss, o1, indent + 1, true, escape); + } + + ss.append("\n"); + ss.append(pad(indent)).append(']'); + } + } + + private static String escapeJson(String v) { + String s = v.replaceAll("\\\\", "\\\\\\\\"); + s = s.replaceAll("\"", "\\\\\""); + s = s.replaceAll("\n", "\\\\n"); + s = s.replaceAll("\r", "\\\\r"); + s = s.replaceAll("\t", "\\\\t"); + return s; + } + + private static String pad(int n) { + String s = ""; + for (int i = 0; i < n; i++) { + s += " "; + } + return s; + } +} |