diff options
56 files changed, 4028 insertions, 0 deletions
diff --git a/lib/doorman/pom.xml b/lib/doorman/pom.xml new file mode 100644 index 000000000..e7883a944 --- /dev/null +++ b/lib/doorman/pom.xml @@ -0,0 +1,54 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.onap.ccsdk.features</groupId> + <artifactId>ccsdk-features</artifactId> + <version>1.0.0-SNAPSHOT</version> + </parent> + + <groupId>org.onap.ccsdk.features.lib.doorman</groupId> + <artifactId>doorman</artifactId> + + <description>Doorman - Request prioritization and agregation queue</description> + + <dependencies> + + <dependency> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.onap.ccsdk.features.lib.rlock</groupId> + <artifactId>rlock</artifactId> + <version>1.0.0-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java new file mode 100644 index 000000000..b6af1731e --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageClassifier.java @@ -0,0 +1,11 @@ +package org.onap.ccsdk.features.lib.doorman; + +import org.onap.ccsdk.features.lib.doorman.data.MessageData; +import org.onap.ccsdk.features.lib.doorman.data.Queue; + +public interface MessageClassifier { + + Queue determineQueue(MessageData request); + + String getExtMessageId(MessageData request); +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java new file mode 100644 index 000000000..1ff811baa --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptor.java @@ -0,0 +1,10 @@ +package org.onap.ccsdk.features.lib.doorman; + +import org.onap.ccsdk.features.lib.doorman.data.MessageData; + +public interface MessageInterceptor { + + MessageData processRequest(MessageData request); + + void processResponse(MessageData response); +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java new file mode 100644 index 000000000..4c9886425 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageInterceptorFactory.java @@ -0,0 +1,6 @@ +package org.onap.ccsdk.features.lib.doorman; + +public interface MessageInterceptorFactory { + + MessageInterceptor create(); +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java new file mode 100644 index 000000000..0bb05a34c --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageProcessor.java @@ -0,0 +1,8 @@ +package org.onap.ccsdk.features.lib.doorman; + +import org.onap.ccsdk.features.lib.doorman.data.MessageData; + +public interface MessageProcessor { + + void processMessage(MessageData request); +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java new file mode 100644 index 000000000..4285393a4 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/MessageQueueHandler.java @@ -0,0 +1,12 @@ +package org.onap.ccsdk.features.lib.doorman; + +import java.util.List; +import java.util.Map; +import org.onap.ccsdk.features.lib.doorman.data.Event; +import org.onap.ccsdk.features.lib.doorman.data.Message; +import org.onap.ccsdk.features.lib.doorman.data.MessageAction; + +public interface MessageQueueHandler { + + Map<Long, MessageAction> nextAction(Event event, Message msg, List<Message> queue); +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java new file mode 100644 index 000000000..764faa9c8 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDao.java @@ -0,0 +1,30 @@ +package org.onap.ccsdk.features.lib.doorman.dao; + +import java.util.Date; +import java.util.List; +import org.onap.ccsdk.features.lib.doorman.data.Message; +import org.onap.ccsdk.features.lib.doorman.data.MessageAction; +import org.onap.ccsdk.features.lib.doorman.data.MessageData; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatus; +import org.onap.ccsdk.features.lib.doorman.data.Queue; + +public interface MessageDao { + + long addArrivedMessage(String extMessageId, MessageData request, Queue queue, Date timestamp); + + void updateMessageStarted(long messageId, Date timestamp); + + void updateMessageCompleted(long messageId, String resolution, Date timestamp); + + void updateMessageResponse(long messageId, Date timestamp, MessageData response); + + void addStatus(long messageId, MessageStatus status); + + void addAction(long messageId, MessageAction action); + + void updateActionDone(long actionId, Date now); + + List<Message> readMessageQueue(Queue queue); + + MessageAction getNextAction(long messageId); +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java new file mode 100644 index 000000000..b3ecf3d4b --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/dao/MessageDaoImpl.java @@ -0,0 +1,307 @@ +package org.onap.ccsdk.features.lib.doorman.dao; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import javax.sql.DataSource; +import org.onap.ccsdk.features.lib.doorman.data.ActionStatus; +import org.onap.ccsdk.features.lib.doorman.data.Message; +import org.onap.ccsdk.features.lib.doorman.data.MessageAction; +import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue; +import org.onap.ccsdk.features.lib.doorman.data.MessageData; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatus; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatusValue; +import org.onap.ccsdk.features.lib.doorman.data.Queue; +import org.onap.ccsdk.features.lib.doorman.util.JsonUtil; + +public class MessageDaoImpl implements MessageDao { + + private DataSource dataSource; + + @Override + public long addArrivedMessage(String extMessageId, MessageData request, Queue queue, Date timestamp) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + long id = 0; + String sql = "INSERT INTO message (ext_message_id, request_param, request_body, arrived_timestamp, queue_type, queue_id) VALUES (?, ?, ?, ?, ?, ?)"; + try (PreparedStatement ps = con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { + ps.setString(1, extMessageId); + ps.setString(2, JsonUtil.dataToJson(request.param)); + ps.setString(3, request.body); + ps.setTimestamp(4, new Timestamp(timestamp.getTime())); + if (queue != null) { + ps.setString(5, queue.type); + ps.setString(6, queue.id); + } else { + ps.setNull(5, Types.VARCHAR); + ps.setNull(6, Types.VARCHAR); + } + ps.executeUpdate(); + try (ResultSet rs = ps.getGeneratedKeys()) { + rs.next(); + id = rs.getLong(1); + } + } + con.commit(); + return id; + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error inserting message to DB: " + e.getMessage(), e); + } + } + + @Override + public void updateMessageStarted(long messageId, Date timestamp) { + updateMessageStatus("started_timestamp", messageId, null, timestamp); + } + + @Override + public void updateMessageCompleted(long messageId, String resolution, Date timestamp) { + updateMessageStatus("completed_timestamp", messageId, resolution, timestamp); + } + + private void updateMessageStatus(String timestampColumn, long messageId, String resolution, Date timestamp) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + String sql = "UPDATE message SET " + timestampColumn + " = ? WHERE message_id = ?"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setTimestamp(1, new Timestamp(timestamp.getTime())); + ps.setLong(2, messageId); + ps.executeUpdate(); + } + con.commit(); + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error updating message status in DB: " + e.getMessage(), e); + } + } + + @Override + public void updateMessageResponse(long messageId, Date timestamp, MessageData response) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + String sql = "UPDATE message SET response_timestamp = ?, response_param = ?, response_body = ? WHERE message_id = ?"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setTimestamp(1, new Timestamp(timestamp.getTime())); + ps.setString(2, JsonUtil.dataToJson(response.param)); + ps.setString(3, response.body); + ps.setLong(4, messageId); + ps.executeUpdate(); + } + con.commit(); + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error updating message response in DB: " + e.getMessage(), e); + } + } + + @Override + public void addStatus(long messageId, MessageStatus status) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + String sql = "INSERT INTO message_status (message_id, status, status_timestamp) VALUES (?, ?, ?)"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setLong(1, messageId); + ps.setString(2, status.status.toString()); + ps.setTimestamp(3, new Timestamp(status.timestamp.getTime())); + ps.executeUpdate(); + } + con.commit(); + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error inserting message status to DB: " + e.getMessage(), e); + } + } + + @Override + public void addAction(long messageId, MessageAction action) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + String sql = "INSERT INTO message_action (message_id, action, action_status, resolution, action_timestamp, done_timestamp, hold_time, response_param, response_body) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setLong(1, messageId); + ps.setString(2, action.action.toString()); + ps.setString(3, action.actionStatus.toString()); + ps.setString(4, action.resolution); + ps.setTimestamp(5, new Timestamp(action.timestamp.getTime())); + if (action.doneTimestamp != null) { + ps.setTimestamp(6, new Timestamp(action.doneTimestamp.getTime())); + } else { + ps.setNull(6, Types.TIMESTAMP); + } + ps.setInt(7, action.holdTime); + if (action.returnResponse != null) { + ps.setString(8, JsonUtil.dataToJson(action.returnResponse.param)); + ps.setString(9, action.returnResponse.body); + } else { + ps.setNull(8, Types.VARCHAR); + ps.setNull(9, Types.VARCHAR); + } + ps.executeUpdate(); + } + con.commit(); + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error inserting message action to DB: " + e.getMessage(), e); + } + } + + @Override + public void updateActionDone(long actionId, Date now) { + try (Connection con = dataSource.getConnection()) { + try { + con.setAutoCommit(false); + String sql = "UPDATE message_action SET action_status = ?, done_timestamp = ? WHERE message_action_id = ?"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setString(1, ActionStatus.DONE.toString()); + ps.setTimestamp(2, new Timestamp(now.getTime())); + ps.setLong(3, actionId); + ps.executeUpdate(); + } + con.commit(); + } catch (SQLException ex) { + con.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException("Error updating action in DB: " + e.getMessage(), e); + } + } + + @SuppressWarnings("unchecked") + @Override + public List<Message> readMessageQueue(Queue queue) { + List<Message> messageList = new ArrayList<>(); + try (Connection con = dataSource.getConnection()) { + String msql = "SELECT * FROM message WHERE queue_type = ? AND queue_id = ?"; + String ssql = "SELECT * FROM message_status WHERE message_id = ? ORDER BY message_status_id DESC"; + String asql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY message_action_id DESC"; + try (PreparedStatement mps = con.prepareStatement(msql); PreparedStatement sps = con.prepareStatement(ssql); PreparedStatement aps = con.prepareStatement(asql)) { + mps.setString(1, queue.type); + mps.setString(2, queue.id); + try (ResultSet mrs = mps.executeQuery()) { + while (mrs.next()) { + Message m = new Message(); + m.messageId = mrs.getLong("message_id"); + m.extMessageId = mrs.getString("ext_message_id"); + m.request = new MessageData(); + m.request.param = (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("request_param")); + m.request.body = mrs.getString("request_body"); + m.response = new MessageData(); + m.response.param = (Map<String, Object>) JsonUtil.jsonToData(mrs.getString("response_param")); + m.response.body = mrs.getString("response_body"); + m.queue = new Queue(); + m.queue.type = mrs.getString("queue_type"); + m.queue.id = mrs.getString("queue_id"); + m.arrivedTimestamp = mrs.getTimestamp("arrived_timestamp"); + m.startedTimestamp = mrs.getTimestamp("started_timestamp"); + m.completedTimestamp = mrs.getTimestamp("completed_timestamp"); + m.responseTimestamp = mrs.getTimestamp("response_timestamp"); + m.statusHistory = new ArrayList<>(); + m.actionHistory = new ArrayList<>(); + messageList.add(m); + + sps.setLong(1, m.messageId); + try (ResultSet srs = sps.executeQuery()) { + while (srs.next()) { + MessageStatus s = new MessageStatus(); + s.status = MessageStatusValue.valueOf(srs.getString("status")); + s.timestamp = srs.getTimestamp("status_timestamp"); + m.statusHistory.add(s); + } + } + + aps.setLong(1, m.messageId); + try (ResultSet ars = aps.executeQuery()) { + while (ars.next()) { + MessageAction a = new MessageAction(); + a.actionId = ars.getLong("message_action_id"); + a.action = MessageActionValue.valueOf(ars.getString("action")); + a.actionStatus = ActionStatus.valueOf(ars.getString("action_status")); + a.timestamp = ars.getTimestamp("action_timestamp"); + a.doneTimestamp = ars.getTimestamp("done_timestamp"); + a.holdTime = ars.getInt("hold_time"); + a.returnResponse = new MessageData(); + a.returnResponse.param = (Map<String, Object>) JsonUtil.jsonToData(ars.getString("response_param")); + a.returnResponse.body = ars.getString("response_body"); + if (a.returnResponse.param == null && a.returnResponse.body == null) { + a.returnResponse = null; + } + a.resolution = ars.getString("resolution"); + m.actionHistory.add(a); + } + } + } + } + } + } catch (SQLException e) { + throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e); + } + return messageList; + } + + @SuppressWarnings("unchecked") + @Override + public MessageAction getNextAction(long messageId) { + try (Connection con = dataSource.getConnection()) { + String sql = "SELECT * FROM message_action WHERE message_id = ? ORDER BY action_timestamp DESC"; + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setLong(1, messageId); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + MessageAction a = new MessageAction(); + a.actionId = rs.getLong("message_action_id"); + a.action = MessageActionValue.valueOf(rs.getString("action")); + a.actionStatus = ActionStatus.valueOf(rs.getString("action_status")); + a.timestamp = rs.getTimestamp("action_timestamp"); + a.doneTimestamp = rs.getTimestamp("done_timestamp"); + a.holdTime = rs.getInt("hold_time"); + a.returnResponse = new MessageData(); + a.returnResponse.param = (Map<String, Object>) JsonUtil.jsonToData(rs.getString("response_param")); + a.returnResponse.body = rs.getString("response_body"); + if (a.returnResponse.param == null && a.returnResponse.body == null) { + a.returnResponse = null; + } + a.resolution = rs.getString("resolution"); + return a; + } + return null; + } + } + } catch (SQLException e) { + throw new RuntimeException("Error reading message action from DB: " + e.getMessage(), e); + } + } + + public void setDataSource(DataSource dataSource) { + this.dataSource = dataSource; + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java new file mode 100644 index 000000000..4aa016575 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/ActionStatus.java @@ -0,0 +1,5 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +public enum ActionStatus { + PENDING, DONE +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java new file mode 100644 index 000000000..9960eefbf --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Event.java @@ -0,0 +1,5 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +public enum Event { + ARRIVED, COMPLETED, AWAKEN, CHECK +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java new file mode 100644 index 000000000..1f0b11384 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Message.java @@ -0,0 +1,33 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +import java.util.Date; +import java.util.List; + +public class Message { + + public long messageId; + public String extMessageId; + public MessageData request; + public MessageData response; + public Date arrivedTimestamp; + public Date startedTimestamp; + public Date completedTimestamp; + public Date responseTimestamp; + public Queue queue; + + public List<MessageStatus> statusHistory; + public List<MessageAction> actionHistory; + + @Override + public String toString() { + StringBuilder ss = new StringBuilder(); + ss.append(messageId); + if (extMessageId != null) { + ss.append("::").append(extMessageId); + } + if (queue != null) { + ss.append("::").append(queue); + } + return ss.toString(); + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java new file mode 100644 index 000000000..319fc68ff --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageAction.java @@ -0,0 +1,59 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +import java.util.Date; + +public class MessageAction { + + public long actionId; + public MessageActionValue action; + public ActionStatus actionStatus; + public String resolution; + public Date timestamp; + public Date doneTimestamp; + public int holdTime; // in seconds + public MessageData returnResponse; + + @Override + public String toString() { + return action.toString() + " | " + actionStatus + " | " + resolution + " | " + holdTime + " | " + returnResponse; + } + + public boolean same(MessageAction a2) { + if (action != a2.action) { + return false; + } + if (action == MessageActionValue.HOLD || action == MessageActionValue.RETURN_HOLD) { + if (holdTime != a2.holdTime) { + return false; + } + } + if (action == MessageActionValue.RETURN_COMPLETE || action == MessageActionValue.RETURN_HOLD || action == MessageActionValue.RETURN_PROCESS) { + if (returnResponse == null != (a2.returnResponse == null)) { + return false; + } + if (returnResponse != null) { + if (returnResponse.param == null != (a2.returnResponse.param == null)) { + return false; + } + if (returnResponse.param != null && !returnResponse.param.equals(a2.returnResponse.param)) { + return false; + } + if (returnResponse.body == null != (a2.returnResponse.body == null)) { + return false; + } + if (returnResponse.body != null && !returnResponse.body.equals(a2.returnResponse.body)) { + return false; + } + } + } + if (action == MessageActionValue.RETURN_COMPLETE) { + if (resolution == null != (a2.resolution == null)) { + return false; + } + if (resolution != null && !resolution.equals(a2.resolution)) { + return false; + } + } + return true; + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java new file mode 100644 index 000000000..fd86d37c8 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageActionValue.java @@ -0,0 +1,5 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +public enum MessageActionValue { + HOLD, PROCESS, RETURN_COMPLETE, RETURN_PROCESS, RETURN_HOLD +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java new file mode 100644 index 000000000..1fa5f9a14 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageData.java @@ -0,0 +1,21 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +import java.util.Map; + +public class MessageData { + + public Map<String, Object> param; + public String body; + + @Override + public String toString() { + StringBuilder ss = new StringBuilder(); + ss.append(param); + String b = body; + if (b != null && b.length() > 20) { + b = b.substring(0, 20) + "..."; + } + ss.append(b); + return ss.toString(); + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java new file mode 100644 index 000000000..e9af20be1 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatus.java @@ -0,0 +1,9 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +import java.util.Date; + +public class MessageStatus { + + public MessageStatusValue status; + public Date timestamp; +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java new file mode 100644 index 000000000..af2bbf024 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/MessageStatusValue.java @@ -0,0 +1,5 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +public enum MessageStatusValue { + ARRIVED, IN_QUEUE, PROCESSING_SYNC, PROCESSING_ASYNC, COMPLETED +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java new file mode 100644 index 000000000..7fd83104a --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/data/Queue.java @@ -0,0 +1,12 @@ +package org.onap.ccsdk.features.lib.doorman.data; + +public class Queue { + + public String type; + public String id; + + @Override + public String toString() { + return type + "::" + id; + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java new file mode 100644 index 000000000..a81942771 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageHandlerBaseImpl.java @@ -0,0 +1,357 @@ +package org.onap.ccsdk.features.lib.doorman.impl; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.onap.ccsdk.features.lib.doorman.MessageQueueHandler; +import org.onap.ccsdk.features.lib.doorman.data.Event; +import org.onap.ccsdk.features.lib.doorman.data.Message; +import org.onap.ccsdk.features.lib.doorman.data.MessageAction; +import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue; +import org.onap.ccsdk.features.lib.doorman.data.MessageData; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatus; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatusValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageHandlerBaseImpl implements MessageQueueHandler { + + private static final Logger log = LoggerFactory.getLogger(MessageHandlerBaseImpl.class); + + private boolean async = false; + private int maxParallelCount = 1; + private int maxQueueSize = 0; + private int timeToLive = 0; // in seconds + private int maxTimeInQueue = 60; // in seconds + private int updateWaitTime = 60; // in seconds + + @Override + public Map<Long, MessageAction> nextAction(Event event, Message msg, List<Message> queue) { + long t1 = System.currentTimeMillis(); + log.info(">>>>> Handler started for message: " + msg + ": " + event); + + PrioritizedMessage pmsg = null; + List<PrioritizedMessage> processing = new ArrayList<>(); + List<PrioritizedMessage> waiting = new ArrayList<>(); + Date now = new Date(); + for (Message m : queue) { + PrioritizedMessage pm = new PrioritizedMessage(); + pm.message = m; + pm.priority = assignPriority(msg); + pm.timeInQueue = getTimeInQueue(now, m); + pm.updateGroup = determineUpdateGroup(m); + pm.updateSequence = determineUpdateSequence(m); + + if (pm.message.messageId == msg.messageId) { + pmsg = pm; + + if (event != Event.COMPLETED) { + waiting.add(pm); + } + } else { + MessageStatusValue s = m.statusHistory.get(0).status; + if (s == MessageStatusValue.IN_QUEUE) { + waiting.add(pm); + } else if (s == MessageStatusValue.PROCESSING_SYNC || s == MessageStatusValue.PROCESSING_ASYNC) { + processing.add(pm); + } + } + } + + log.info(" Processing:"); + for (PrioritizedMessage pm : processing) { + log.info(" -- " + pm.message + " | " + pm.priority + " | " + pm.timeInQueue + " | " + pm.updateGroup + " | " + pm.updateSequence); + } + log.info(" Waiting:"); + for (PrioritizedMessage pm : waiting) { + log.info(" -- " + pm.message + " | " + pm.priority + " | " + pm.timeInQueue + " | " + pm.updateGroup + " | " + pm.updateSequence); + } + + log.info(" Determined actions:"); + + Collections.sort(waiting, (pm1, pm2) -> comparePrioritizedMessages(pm1, pm2)); + + Map<Long, MessageAction> mm = new HashMap<>(); + + List<PrioritizedMessage> skipList = findMessagesToSkip(processing, waiting); + addNextActionComplete(mm, skipList, Resolution.SKIPPED, now); + + waiting.removeAll(skipList); + + List<PrioritizedMessage> expiredList = findExpiredMessages(processing, waiting); + addNextActionComplete(mm, expiredList, Resolution.EXPIRED, now); + + waiting.removeAll(expiredList); + + List<PrioritizedMessage> dropList = findMessagesToDrop(processing, waiting); + addNextActionComplete(mm, dropList, Resolution.DROPPED, now); + + waiting.removeAll(dropList); + + List<PrioritizedMessage> processList = findMessagesToProcess(processing, waiting); + for (PrioritizedMessage pm : processList) { + MessageAction a = new MessageAction(); + a.timestamp = now; + if (async) { + a.action = MessageActionValue.RETURN_PROCESS; + a.returnResponse = ackResponse(pm.message); + } else { + a.action = MessageActionValue.PROCESS; + } + mm.put(pm.message.messageId, a); + log.info(" -- Next action for message: " + pm.message + ": " + a.action); + } + + if (event != Event.COMPLETED && !mm.containsKey(pmsg.message.messageId)) { + MessageAction a = new MessageAction(); + a.timestamp = now; + a.holdTime = pmsg.updateGroup != null ? updateWaitTime : maxTimeInQueue; + if (async) { + a.action = MessageActionValue.RETURN_HOLD; + a.returnResponse = ackResponse(pmsg.message); + } else { + a.action = MessageActionValue.HOLD; + } + mm.put(pmsg.message.messageId, a); + log.info(" -- Next action for message: " + pmsg.message + ": " + a.action + ": " + a.holdTime); + + waiting.remove(pmsg); + } + + if (event == Event.COMPLETED) { + MessageAction a = new MessageAction(); + a.timestamp = now; + a.action = MessageActionValue.RETURN_COMPLETE; + a.resolution = Resolution.PROCESSED.toString(); + a.returnResponse = completeResponse(Resolution.PROCESSED, msg); + mm.put(pmsg.message.messageId, a); + log.info(" -- Next action for message: " + pmsg.message + ": " + a.action + ": " + a.resolution); + } + + long t2 = System.currentTimeMillis(); + log.info("<<<<< Handler completed for message: " + msg + ": " + event + ": Time: " + (t2 - t1)); + return mm; + } + + private void addNextActionComplete(Map<Long, MessageAction> mm, List<PrioritizedMessage> skipList, Resolution r, Date now) { + for (PrioritizedMessage pm : skipList) { + MessageAction a = new MessageAction(); + a.action = MessageActionValue.RETURN_COMPLETE; + a.resolution = r.toString(); + a.timestamp = now; + a.returnResponse = completeResponse(r, pm.message); + mm.put(pm.message.messageId, a); + log.info(" -- Next action for message: " + pm.message + ": " + a.action + ": " + a.resolution); + } + } + + protected List<PrioritizedMessage> findMessagesToSkip(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) { + List<PrioritizedMessage> ll = new ArrayList<>(); + Map<String, PrioritizedMessage> lastMap = new HashMap<>(); + for (PrioritizedMessage pm : waiting) { + if (pm.updateGroup != null) { + PrioritizedMessage last = lastMap.get(pm.updateGroup); + if (last == null) { + lastMap.put(pm.updateGroup, pm); + } else { + if (pm.updateSequence > last.updateSequence) { + ll.add(last); + lastMap.put(pm.updateGroup, pm); + } + } + } + } + return ll; + } + + protected List<PrioritizedMessage> findExpiredMessages(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) { + List<PrioritizedMessage> ll = new ArrayList<>(); + if (timeToLive > 0) { + for (PrioritizedMessage pm : waiting) { + if (pm.timeInQueue > timeToLive) { + ll.add(pm); + } + } + } + return ll; + } + + protected List<PrioritizedMessage> findMessagesToDrop(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) { + List<PrioritizedMessage> ll = new ArrayList<>(); + if (maxQueueSize > 0) { + // Drop the least important messages (last in the prioritized waiting list) + for (int i = maxQueueSize; i < waiting.size(); i++) { + ll.add(waiting.get(i)); + } + } + return ll; + } + + protected List<PrioritizedMessage> findMessagesToProcess(List<PrioritizedMessage> processing, List<PrioritizedMessage> waiting) { + List<PrioritizedMessage> ll = new ArrayList<>(); + + if (processing.size() >= maxParallelCount) { + return ll; + } + + // Find messages allowed to be processed based on the concurrency rules + + List<List<String>> currentLocks = new ArrayList<>(); + for (PrioritizedMessage m : processing) { + List<List<String>> locks = determineConcurency(m.message); + if (locks != null) { + currentLocks.addAll(locks); + } + } + + List<PrioritizedMessage> allowed = new ArrayList<>(); + for (PrioritizedMessage m : waiting) { + List<List<String>> neededLocks = determineConcurency(m.message); + if (allowed(currentLocks, neededLocks)) { + allowed.add(m); + } + } + + // Remove messages that are waiting on hold + Iterator<PrioritizedMessage> ii = allowed.iterator(); + while (ii.hasNext()) { + PrioritizedMessage pm = ii.next(); + if (pm.updateGroup != null) { + log.info(" --- Check: " + pm.message + ": " + pm.timeInQueue + " >= " + updateWaitTime); + if (pm.timeInQueue < updateWaitTime) { + ii.remove(); + } + } + } + + // Limit the number of processing messages to maxParallelCount + + for (int i = 0; i < allowed.size() && i < maxParallelCount - processing.size(); i++) { + ll.add(allowed.get(i)); + } + + return ll; + } + + private int comparePrioritizedMessages(PrioritizedMessage pm1, PrioritizedMessage pm2) { + if (pm1.timeInQueue >= maxTimeInQueue && pm2.timeInQueue < maxTimeInQueue) { + return -1; + } + if (pm1.timeInQueue < maxTimeInQueue && pm2.timeInQueue >= maxTimeInQueue) { + return 1; + } + if (pm1.priority < pm2.priority) { + return -1; + } + if (pm1.priority > pm2.priority) { + return 1; + } + if (pm1.timeInQueue > pm2.timeInQueue) { + return -1; + } + if (pm1.timeInQueue < pm2.timeInQueue) { + return 1; + } + return 0; + } + + private int getTimeInQueue(Date now, Message m) { + // Find the elapsed time since message arrived. That will be the timestamp of the first + // status of the message (last status in the status history list) + MessageStatus receivedStatus = m.statusHistory.get(m.statusHistory.size() - 1); + return (int) ((now.getTime() - receivedStatus.timestamp.getTime()) / 1000); + } + + private boolean allowed(List<List<String>> currentLocks, List<List<String>> neededLocks) { + if (neededLocks == null) { + return true; + } + for (List<String> neededLockLevels : neededLocks) { + for (List<String> currentLockLevels : currentLocks) { + int n = neededLockLevels.size() < currentLockLevels.size() ? neededLockLevels.size() : currentLockLevels.size(); + boolean good = false; + for (int i = 0; i < n; i++) { + if (!neededLockLevels.get(i).equals(currentLockLevels.get(i))) { + good = true; + break; + } + } + if (!good) { + return false; + } + } + } + return true; + } + + protected void initMessage(Message msg) { + } + + protected void closeMessage(Message msg) { + } + + protected long assignPriority(Message msg) { + return msg.messageId; // FIFO by default + } + + protected String determineUpdateGroup(Message msg) { + return null; + } + + protected long determineUpdateSequence(Message msg) { + return msg.messageId; + } + + protected List<List<String>> determineConcurency(Message msg) { + return null; + } + + protected MessageData ackResponse(Message msg) { + return null; + } + + protected MessageData completeResponse(Resolution r, Message msg) { + return null; + } + + protected static enum Resolution { + PROCESSED, SKIPPED, EXPIRED, DROPPED + } + + protected static class PrioritizedMessage { + + public Message message; + public long priority; + public int timeInQueue; + public String updateGroup; + public long updateSequence; + } + + public void setMaxParallelCount(int maxParallelCount) { + this.maxParallelCount = maxParallelCount; + } + + public void setTimeToLive(int timeToLive) { + this.timeToLive = timeToLive; + } + + public void setMaxTimeInQueue(int maxTimeInQueue) { + this.maxTimeInQueue = maxTimeInQueue; + } + + public void setUpdateWaitTime(int updateWaitTime) { + this.updateWaitTime = updateWaitTime; + } + + public void setMaxQueueSize(int maxQueueSize) { + this.maxQueueSize = maxQueueSize; + } + + public void setAsync(boolean async) { + this.async = async; + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java new file mode 100644 index 000000000..61059e067 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorFactoryImpl.java @@ -0,0 +1,57 @@ +package org.onap.ccsdk.features.lib.doorman.impl; + +import java.util.Map; +import org.onap.ccsdk.features.lib.doorman.MessageClassifier; +import org.onap.ccsdk.features.lib.doorman.MessageInterceptor; +import org.onap.ccsdk.features.lib.doorman.MessageInterceptorFactory; +import org.onap.ccsdk.features.lib.doorman.MessageProcessor; +import org.onap.ccsdk.features.lib.doorman.MessageQueueHandler; +import org.onap.ccsdk.features.lib.doorman.dao.MessageDao; +import org.onap.ccsdk.features.lib.rlock.LockHelper; + +public class MessageInterceptorFactoryImpl implements MessageInterceptorFactory { + + private MessageClassifier messageClassifier; + private Map<String, MessageQueueHandler> handlerMap; + private MessageProcessor messageProcessor; + private MessageDao messageDao; + + private LockHelper lockHelper; + private int lockTimeout; // in seconds + + @Override + public MessageInterceptor create() { + MessageInterceptorImpl mi = new MessageInterceptorImpl(); + mi.setMessageClassifier(messageClassifier); + mi.setHandlerMap(handlerMap); + mi.setMessageProcessor(messageProcessor); + mi.setMessageDao(messageDao); + mi.setLockHelper(lockHelper); + mi.setLockTimeout(lockTimeout); + return mi; + } + + public void setMessageDao(MessageDao messageDao) { + this.messageDao = messageDao; + } + + public void setLockHelper(LockHelper lockHelper) { + this.lockHelper = lockHelper; + } + + public void setLockTimeout(int lockTimeout) { + this.lockTimeout = lockTimeout; + } + + public void setMessageClassifier(MessageClassifier messageClassifier) { + this.messageClassifier = messageClassifier; + } + + public void setHandlerMap(Map<String, MessageQueueHandler> handlerMap) { + this.handlerMap = handlerMap; + } + + public void setMessageProcessor(MessageProcessor messageProcessor) { + this.messageProcessor = messageProcessor; + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java new file mode 100644 index 000000000..d2ab97101 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/impl/MessageInterceptorImpl.java @@ -0,0 +1,336 @@ +package org.onap.ccsdk.features.lib.doorman.impl; + +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Map; +import org.onap.ccsdk.features.lib.doorman.MessageClassifier; +import org.onap.ccsdk.features.lib.doorman.MessageInterceptor; +import org.onap.ccsdk.features.lib.doorman.MessageProcessor; +import org.onap.ccsdk.features.lib.doorman.MessageQueueHandler; +import org.onap.ccsdk.features.lib.doorman.dao.MessageDao; +import org.onap.ccsdk.features.lib.doorman.data.ActionStatus; +import org.onap.ccsdk.features.lib.doorman.data.Event; +import org.onap.ccsdk.features.lib.doorman.data.Message; +import org.onap.ccsdk.features.lib.doorman.data.MessageAction; +import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue; +import org.onap.ccsdk.features.lib.doorman.data.MessageData; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatus; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatusValue; +import org.onap.ccsdk.features.lib.doorman.data.Queue; +import org.onap.ccsdk.features.lib.rlock.LockHelper; +import org.onap.ccsdk.features.lib.rlock.SynchronizedFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageInterceptorImpl implements MessageInterceptor { + + private static final Logger log = LoggerFactory.getLogger(MessageInterceptorImpl.class); + + private MessageClassifier messageClassifier; + private Map<String, MessageQueueHandler> handlerMap; + private MessageProcessor messageProcessor; + private MessageDao messageDao; + + private LockHelper lockHelper; + private int lockTimeout = 10; // in seconds + + private Message message = null; + private MessageQueueHandler handler = null; + + @Override + public MessageData processRequest(MessageData request) { + Date now = new Date(); + + Queue q = null; + String extMessageId = null; + if (messageClassifier != null) { + q = messageClassifier.determineQueue(request); + extMessageId = messageClassifier.getExtMessageId(request); + } + + long id = messageDao.addArrivedMessage(extMessageId, request, q, now); + + MessageStatus arrivedStatus = new MessageStatus(); + arrivedStatus.status = MessageStatusValue.ARRIVED; + arrivedStatus.timestamp = now; + messageDao.addStatus(id, arrivedStatus); + + message = new Message(); + message.messageId = id; + message.request = request; + message.arrivedTimestamp = now; + message.queue = q; + message.extMessageId = extMessageId; + + log.info("Message received: " + message); + + if (q != null && handlerMap != null) { + handler = handlerMap.get(q.type); + } + + if (q == null || handler == null) { + processSync(); + return null; // Do nothing, normal message processing + } + + Event event = Event.ARRIVED; + + while (true) { + MessageFunction func = new MessageFunction(event); + func.exec(); + + MessageAction nextAction = func.getNextAction(); + if (nextAction == null) { + processSync(); + return null; + } + + switch (nextAction.action) { + + case PROCESS: + processSync(); + return null; + + case HOLD: { + event = waitForNewAction(nextAction.holdTime); + break; + } + + case RETURN_COMPLETE: + returnComplete(nextAction); + return nextAction.returnResponse; + + case RETURN_PROCESS: + processAsync(nextAction.returnResponse); + return nextAction.returnResponse; + + case RETURN_HOLD: { + returnHold(nextAction); + return nextAction.returnResponse; + } + } + } + } + + private void processSync() { + messageDao.updateMessageStarted(message.messageId, new Date()); + log.info("Message processing started: " + message); + } + + private void processAsync(MessageData returnResponse) { + Thread t = new Thread(() -> processMessage(message.request), message.queue.type + "::" + message.queue.id + "::" + message.messageId); + t.start(); + + messageDao.updateMessageResponse(message.messageId, new Date(), returnResponse); + } + + private void processMessage(MessageData request) { + messageDao.updateMessageStarted(message.messageId, new Date()); + log.info("Message processing started: " + message); + if (messageProcessor != null) { + messageProcessor.processMessage(request); + } + + MessageFunction func = new MessageFunction(Event.COMPLETED); + func.exec(); + MessageAction nextAction = func.getNextAction(); + + messageDao.updateMessageCompleted(message.messageId, nextAction.resolution, new Date()); + log.info("Message processing completed: " + message); + } + + private void returnComplete(MessageAction nextAction) { + Date now = new Date(); + messageDao.updateMessageResponse(message.messageId, now, nextAction.returnResponse); + messageDao.updateMessageCompleted(message.messageId, nextAction.resolution, now); + log.info("Message processing completed: " + message); + } + + private void returnHold(MessageAction nextAction) { + Thread t = new Thread(() -> asyncQueue(nextAction), message.queue.type + "::" + message.queue.id + "::" + message.messageId); + t.start(); + messageDao.updateMessageResponse(message.messageId, new Date(), nextAction.returnResponse); + } + + private void asyncQueue(MessageAction nextAction) { + Event event = waitForNewAction(nextAction.holdTime); + + while (true) { + MessageFunction func = new MessageFunction(event); + func.exec(); + + nextAction = func.getNextAction(); + if (nextAction == null) { + processMessage(message.request); + return; + } + + switch (nextAction.action) { + case PROCESS: + processMessage(message.request); + return; + + case HOLD: { + event = waitForNewAction(nextAction.holdTime); + break; + } + + default: + return; + } + } + } + + private Event waitForNewAction(int holdTime) { + long startTime = System.currentTimeMillis(); + long currentTime = startTime; + while (currentTime - startTime <= (holdTime + 1) * 1000) { + try { + Thread.sleep(5000); + } catch (Exception e) { + } + + MessageAction nextAction = messageDao.getNextAction(message.messageId); + if (nextAction != null && nextAction.action != MessageActionValue.HOLD) { + return Event.AWAKEN; + } + + currentTime = System.currentTimeMillis(); + } + return Event.CHECK; + } + + @Override + public void processResponse(MessageData response) { + if (message == null) { + return; + } + + String resolution = null; + if (message.queue != null && handler != null) { + MessageFunction func = new MessageFunction(Event.COMPLETED); + func.exec(); + MessageAction nextAction = func.getNextAction(); + if (nextAction != null) { + resolution = nextAction.resolution; + } + } + + Date now = new Date(); + messageDao.updateMessageResponse(message.messageId, now, response); + messageDao.updateMessageCompleted(message.messageId, resolution, now); + log.info("Message processing completed: " + message); + } + + private class MessageFunction extends SynchronizedFunction { + + private Event event; + + private MessageAction nextAction = null; // Output + + public MessageFunction(Event event) { + super(lockHelper, Collections.singleton(message.queue.type + "::" + message.queue.id), lockTimeout); + this.event = event; + } + + @Override + protected void _exec() { + List<Message> messageQueue = messageDao.readMessageQueue(message.queue); + if (event == Event.AWAKEN) { + for (Message m : messageQueue) { + if (m.messageId == message.messageId) { + nextAction = m.actionHistory.get(0); + } + } + if (nextAction != null) { + messageDao.updateActionDone(nextAction.actionId, new Date()); + } + } else { + Map<Long, MessageAction> nextActionMap = handler.nextAction(event, message, messageQueue); + if (nextActionMap != null) { + for (Message m : messageQueue) { + MessageAction action = nextActionMap.get(m.messageId); + if (action != null) { + if (m.messageId == message.messageId) { + action.actionStatus = ActionStatus.DONE; + action.doneTimestamp = new Date(); + messageDao.addAction(m.messageId, action); + nextAction = action; + } else { + MessageAction lastAction = m.actionHistory.get(0); + if (lastAction.actionStatus != ActionStatus.PENDING || !action.same(lastAction)) { + action.actionStatus = ActionStatus.PENDING; + messageDao.addAction(m.messageId, action); + } + } + } + } + } + } + if (nextAction != null) { + log.info("Next message action: " + message + ":" + nextAction.action); + MessageStatus status = determineStatus(nextAction); + if (status != null) { + messageDao.addStatus(message.messageId, status); + log.info("Updating message status: " + message + ":" + status.status); + } + } + } + + public MessageAction getNextAction() { + return nextAction; + } + } + + private MessageStatus determineStatus(MessageAction action) { + if (action == null) { + return null; + } + + MessageStatus s = new MessageStatus(); + s.timestamp = new Date(); + + switch (action.action) { + case PROCESS: + s.status = MessageStatusValue.PROCESSING_SYNC; + break; + case HOLD: + case RETURN_HOLD: + s.status = MessageStatusValue.IN_QUEUE; + break; + case RETURN_PROCESS: + s.status = MessageStatusValue.PROCESSING_ASYNC; + break; + case RETURN_COMPLETE: + s.status = MessageStatusValue.COMPLETED; + break; + } + + return s; + } + + public void setMessageDao(MessageDao messageDao) { + this.messageDao = messageDao; + } + + public void setLockHelper(LockHelper lockHelper) { + this.lockHelper = lockHelper; + } + + public void setLockTimeout(int lockTimeout) { + this.lockTimeout = lockTimeout; + } + + public void setMessageClassifier(MessageClassifier messageClassifier) { + this.messageClassifier = messageClassifier; + } + + public void setHandlerMap(Map<String, MessageQueueHandler> handlerMap) { + this.handlerMap = handlerMap; + } + + public void setMessageProcessor(MessageProcessor messageProcessor) { + this.messageProcessor = messageProcessor; + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java new file mode 100644 index 000000000..73c3d8b6a --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/servlet/MessageInterceptorFilter.java @@ -0,0 +1,236 @@ +package org.onap.ccsdk.features.lib.doorman.servlet; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.CharArrayWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletInputStream; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletRequestWrapper; +import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.HttpServletResponseWrapper; +import org.onap.ccsdk.features.lib.doorman.MessageInterceptor; +import org.onap.ccsdk.features.lib.doorman.MessageInterceptorFactory; +import org.onap.ccsdk.features.lib.doorman.data.MessageData; + +public class MessageInterceptorFilter implements Filter { + + private MessageInterceptorFactory messageInterceptorFactory; + + @Override + public void init(FilterConfig filterConfig) throws ServletException { + } + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { + RequestWrapper req = new RequestWrapper((HttpServletRequest) request); + MessageData requestData = getMessageRequest(req); + + MessageInterceptor interceptor = messageInterceptorFactory.create(); + MessageData responseData = interceptor.processRequest(requestData); + + if (responseData == null) { + ResponseWrapper res = new ResponseWrapper((HttpServletResponse) response); + + chain.doFilter(req, res); + + responseData = res.getMessageResponse(); + interceptor.processResponse(responseData); + } + + setMessageResponse((HttpServletResponse) response, responseData); + } + + @SuppressWarnings("unchecked") + private void setMessageResponse(HttpServletResponse response, MessageData responseData) throws IOException { + if (responseData.param != null) { + String contentType = (String) responseData.param.get("content_type"); + if (contentType != null) { + response.setContentType(contentType); + } + Integer httpCode = (Integer) responseData.param.get("http_code"); + if (httpCode != null) { + response.setStatus(httpCode); + } + Map<String, Object> headers = (Map<String, Object>) responseData.param.get("headers"); + if (headers != null) { + for (Entry<String, Object> entry : headers.entrySet()) { + String name = entry.getKey(); + Object v = entry.getValue(); + if (v instanceof List) { + List<Object> ll = (List<Object>) v; + for (Object o : ll) { + response.addHeader(name, o.toString()); + } + } else { + response.setHeader(name, v.toString()); + } + } + } + } + + if (responseData.body != null) { + response.setContentLength(responseData.body.length()); + response.getWriter().write(responseData.body); + } + } + + @SuppressWarnings("unchecked") + private MessageData getMessageRequest(RequestWrapper request) { + MessageData requestData = new MessageData(); + requestData.param = new HashMap<>(); + requestData.param.put("http_method", request.getMethod()); + requestData.param.put("uri", request.getPathInfo()); + requestData.param.put("param", request.getParameterMap()); + requestData.param.put("content_type", request.getContentType()); + + Map<String, Object> headers = new HashMap<>(); + Enumeration<String> headerNames = request.getHeaderNames(); + while (headerNames.hasMoreElements()) { + String name = headerNames.nextElement(); + Enumeration<String> values = request.getHeaders(name); + List<String> valueList = new ArrayList<>(); + while (values.hasMoreElements()) { + valueList.add(values.nextElement()); + } + if (valueList.size() > 1) { + headers.put(name, valueList); + } else { + headers.put(name, valueList.get(0)); + } + } + requestData.param.put("headers", headers); + + requestData.body = request.getBody(); + + return requestData; + } + + @Override + public void destroy() { + } + + private static class RequestWrapper extends HttpServletRequestWrapper { + + private final String body; + + public RequestWrapper(HttpServletRequest request) throws IOException { + super(request); + + StringBuilder stringBuilder = new StringBuilder(); + try (InputStream inputStream = request.getInputStream()) { + if (inputStream != null) { + try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) { + char[] charBuffer = new char[128]; + int bytesRead = -1; + while ((bytesRead = bufferedReader.read(charBuffer)) > 0) { + stringBuilder.append(charBuffer, 0, bytesRead); + } + } + } + } + body = stringBuilder.toString(); + } + + @Override + public ServletInputStream getInputStream() throws IOException { + final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body.getBytes()); + ServletInputStream servletInputStream = new ServletInputStream() { + + @Override + public int read() throws IOException { + return byteArrayInputStream.read(); + } + }; + return servletInputStream; + } + + @Override + public BufferedReader getReader() throws IOException { + return new BufferedReader(new InputStreamReader(getInputStream())); + } + + public String getBody() { + return body; + } + } + + public class ResponseWrapper extends HttpServletResponseWrapper { + + private CharArrayWriter writer = new CharArrayWriter(); + private Map<String, Object> param = new HashMap<>(); + + public ResponseWrapper(HttpServletResponse response) { + super(response); + } + + @Override + public PrintWriter getWriter() { + return new PrintWriter(writer); + } + + @Override + public void setStatus(int status) { + param.put("http_code", status); + } + + @SuppressWarnings("unchecked") + @Override + public void setHeader(String name, String value) { + Map<String, Object> headers = (Map<String, Object>) param.get("headers"); + if (headers == null) { + headers = new HashMap<>(); + param.put("headers", headers); + } + headers.put(name, value); + } + + @SuppressWarnings("unchecked") + @Override + public void addHeader(String name, String value) { + Map<String, Object> headers = (Map<String, Object>) param.get("headers"); + if (headers == null) { + headers = new HashMap<>(); + param.put("headers", headers); + } + Object v = headers.get(name); + if (v == null) { + headers.put(name, value); + } else if (v instanceof List) { + ((List<Object>) v).add(value); + } else { + List<Object> ll = new ArrayList<>(); + ll.add(v); + ll.add(value); + headers.put(name, ll); + } + } + + public MessageData getMessageResponse() { + MessageData responseData = new MessageData(); + responseData.param = param; + responseData.body = writer.toString(); + return responseData; + } + } + + public void setMessageInterceptorFactory(MessageInterceptorFactory messageInterceptorFactory) { + this.messageInterceptorFactory = messageInterceptorFactory; + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java new file mode 100644 index 000000000..42f1b21e7 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/DataUtil.java @@ -0,0 +1,273 @@ +package org.onap.ccsdk.features.lib.doorman.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DataUtil { + + @SuppressWarnings("unused") + private static final Logger log = LoggerFactory.getLogger(DataUtil.class); + + @SuppressWarnings("unchecked") + public static Object get(Object struct, String compositeKey) { + if (struct == null || compositeKey == null) { + return null; + } + + List<Object> keys = splitCompositeKey(compositeKey); + Object currentValue = struct; + String currentKey = ""; + + for (Object key : keys) { + if (key instanceof Integer) { + if (!(currentValue instanceof List)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list"); + } + + Integer keyi = (Integer) key; + List<Object> currentValueL = (List<Object>) currentValue; + + if (keyi >= currentValueL.size()) { + return null; + } + + currentValue = currentValueL.get(keyi); + if (currentValue == null) { + return null; + } + + currentKey += "[" + key + "]"; + } else { + if (!(currentValue instanceof Map)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map"); + } + + currentValue = ((Map<String, Object>) currentValue).get(key); + if (currentValue == null) { + return null; + } + + currentKey += "." + key; + } + } + return currentValue; + } + + @SuppressWarnings("unchecked") + public static void set(Object struct, String compositeKey, Object value) { + if (struct == null) { + throw new IllegalArgumentException("Null argument: struct"); + } + + if (compositeKey == null || compositeKey.length() == 0) { + throw new IllegalArgumentException("Null or empty argument: compositeKey"); + } + + if (value == null) { + return; + } + + List<Object> keys = splitCompositeKey(compositeKey); + Object currentValue = struct; + String currentKey = ""; + + for (int i = 0; i < keys.size() - 1; i++) { + Object key = keys.get(i); + + if (key instanceof Integer) { + if (!(currentValue instanceof List)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list"); + } + + Integer keyi = (Integer) key; + List<Object> currentValueL = (List<Object>) currentValue; + int size = currentValueL.size(); + + if (keyi >= size) { + for (int k = 0; k < keyi - size + 1; k++) { + currentValueL.add(null); + } + } + + Object newValue = currentValueL.get(keyi); + if (newValue == null) { + Object nextKey = keys.get(i + 1); + if (nextKey instanceof Integer) { + newValue = new ArrayList<>(); + } else { + newValue = new HashMap<>(); + } + currentValueL.set(keyi, newValue); + } + + currentValue = newValue; + currentKey += "[" + key + "]"; + + } else { + if (!(currentValue instanceof Map)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map"); + } + + Object newValue = ((Map<String, Object>) currentValue).get(key); + if (newValue == null) { + Object nextKey = keys.get(i + 1); + if (nextKey instanceof Integer) { + newValue = new ArrayList<>(); + } else { + newValue = new HashMap<>(); + } + ((Map<String, Object>) currentValue).put((String) key, newValue); + } + + currentValue = newValue; + currentKey += "." + key; + } + } + + Object key = keys.get(keys.size() - 1); + if (key instanceof Integer) { + if (!(currentValue instanceof List)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list"); + } + + Integer keyi = (Integer) key; + List<Object> currentValueL = (List<Object>) currentValue; + int size = currentValueL.size(); + + if (keyi >= size) { + for (int k = 0; k < keyi - size + 1; k++) { + currentValueL.add(null); + } + } + + currentValueL.set(keyi, value); + + } else { + if (!(currentValue instanceof Map)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map"); + } + + ((Map<String, Object>) currentValue).put((String) key, value); + } + } + + @SuppressWarnings("unchecked") + public static void delete(Object struct, String compositeKey) { + if (struct == null) { + throw new IllegalArgumentException("Null argument: struct"); + } + + if (compositeKey == null) { + throw new IllegalArgumentException("Null argument: compositeKey"); + } + + List<Object> keys = splitCompositeKey(compositeKey); + Object currentValue = struct; + String currentKey = ""; + + for (int i = 0; i < keys.size() - 1; i++) { + Object key = keys.get(i); + + if (key instanceof Integer) { + if (!(currentValue instanceof List)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list"); + } + + Integer keyi = (Integer) key; + List<Object> currentValueL = (List<Object>) currentValue; + + if (keyi >= currentValueL.size()) { + return; + } + + currentValue = currentValueL.get(keyi); + if (currentValue == null) { + return; + } + + currentKey += "[" + key + "]"; + + } else { + if (!(currentValue instanceof Map)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map"); + } + + currentValue = ((Map<String, Object>) currentValue).get(key); + if (currentValue == null) { + return; + } + + currentKey += "." + key; + } + } + + Object key = keys.get(keys.size() - 1); + if (key instanceof Integer) { + if (!(currentValue instanceof List)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References list '" + currentKey + "', but '" + currentKey + "' is not a list"); + } + + Integer keyi = (Integer) key; + List<Object> currentValueL = (List<Object>) currentValue; + + if (keyi < currentValueL.size()) { + currentValueL.remove(keyi); + } + + } else { + if (!(currentValue instanceof Map)) { + throw new IllegalArgumentException("Cannot resolve: " + compositeKey + ": References map '" + currentKey + "', but '" + currentKey + "' is not a map"); + } + + ((Map<String, Object>) currentValue).remove(key); + } + } + + private static List<Object> splitCompositeKey(String compositeKey) { + if (compositeKey == null) { + return Collections.emptyList(); + } + + String[] ss = compositeKey.split("\\."); + List<Object> ll = new ArrayList<>(); + for (String s : ss) { + if (s.length() == 0) { + continue; + } + + int i1 = s.indexOf('['); + if (i1 < 0) { + ll.add(s); + } else { + if (!s.endsWith("]")) { + throw new IllegalArgumentException("Invalid composite key: " + compositeKey + ": No matching ] found"); + } + + String s1 = s.substring(0, i1); + if (s1.length() > 0) { + ll.add(s1); + } + + String s2 = s.substring(i1 + 1, s.length() - 1); + try { + int n = Integer.parseInt(s2); + if (n < 0) { + throw new IllegalArgumentException("Invalid composite key: " + compositeKey + ": Index must be >= 0: " + n); + } + + ll.add(n); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid composite key: " + compositeKey + ": Index not a number: " + s2); + } + } + } + + return ll; + } +} diff --git a/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java new file mode 100644 index 000000000..fc9ce0936 --- /dev/null +++ b/lib/doorman/src/main/java/org/onap/ccsdk/features/lib/doorman/util/JsonUtil.java @@ -0,0 +1,319 @@ +package org.onap.ccsdk.features.lib.doorman.util; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class JsonUtil { + + public static Object jsonToData(String s) { + if (s == null) { + return null; + } + return jsonToData(new Current(s)); + } + + private static class Current { + + public String s; + public int i; + public int line, pos; + + public Current(String s) { + this.s = s; + i = 0; + line = 1; + pos = 1; + } + + public void move() { + i++; + pos++; + } + + public void move(int k) { + i += k; + pos += k; + } + + public boolean end() { + return i >= s.length(); + } + + public char get() { + return s.charAt(i); + } + + public boolean is(String ss) { + return i < s.length() - ss.length() && s.substring(i, i + ss.length()).equals(ss); + } + + public boolean is(char c) { + return i < s.length() && s.charAt(i) == c; + } + + public void skipWhiteSpace() { + while (i < s.length() && s.charAt(i) <= 32) { + char cc = s.charAt(i); + if (cc == '\n') { + line++; + pos = 1; + } else if (cc == ' ' || cc == '\t') { + pos++; + } + i++; + } + } + } + + public static Object jsonToData(Current c) { + c.skipWhiteSpace(); + + if (c.end()) { + return ""; + } + + char cc = c.get(); + if (cc == '{') { + c.move(); + return jsonToMap(c); + } + if (cc == '[') { + c.move(); + return jsonToList(c); + } + return jsonToObject(c); + } + + private static Object jsonToObject(Current c) { + if (c.is('"')) { + c.move(); + StringBuilder ss = new StringBuilder(); + while (!c.end() && c.get() != '"') { + if (c.get() == '\\') { + c.move(); + char cc = c.get(); + switch (cc) { + case '\\': + ss.append('\\'); + break; + case '"': + ss.append('\"'); + break; + case 'n': + ss.append('\n'); + break; + case 'r': + ss.append('\r'); + break; + case 't': + ss.append('\t'); + break; + default: + throw new IllegalArgumentException("JSON parsing error: Invalid escaped character at (" + c.line + ':' + c.pos + "): " + cc); + } + } else { + ss.append(c.get()); + } + c.move(); + } + if (!c.end()) { + c.move(); // Skip the closing " + } + return ss.toString(); + } + if (c.is("true")) { + c.move(4); + return true; + } + if (c.is("false")) { + c.move(5); + return false; + } + if (c.is("null")) { + c.move(4); + return null; + } + StringBuilder ss = new StringBuilder(); + while (!c.end() && c.get() != ',' && c.get() != ']' && c.get() != '}' && c.get() != '\n' && c.get() != '\t') { + ss.append(c.get()); + c.move(); + } + try { + return Long.valueOf(ss.toString()); + } catch (Exception e1) { + try { + return Double.valueOf(ss.toString()); + } catch (Exception e2) { + throw new IllegalArgumentException("JSON parsing error: Invalid value at (" + c.line + ':' + c.pos + "): " + ss.toString()); + } + } + } + + private static List<Object> jsonToList(Current c) { + List<Object> ll = new ArrayList<>(); + c.skipWhiteSpace(); + while (!c.end() && c.get() != ']') { + Object value = jsonToData(c); + + ll.add(value); + + c.skipWhiteSpace(); + if (c.is(',')) { + c.move(); + c.skipWhiteSpace(); + } + } + + if (!c.end()) { + c.move(); // Skip the closing ] + } + + return ll; + } + + private static Map<String, Object> jsonToMap(Current c) { + Map<String, Object> mm = new HashMap<>(); + c.skipWhiteSpace(); + while (!c.end() && c.get() != '}') { + Object key = jsonToObject(c); + if (!(key instanceof String)) { + throw new IllegalArgumentException("JSON parsing error: Invalid key at (" + c.line + ':' + c.pos + "): " + key); + } + + c.skipWhiteSpace(); + if (!c.is(':')) { + throw new IllegalArgumentException("JSON parsing error: Expected ':' at (" + c.line + ':' + c.pos + ")"); + } + + c.move(); // Skip the : + Object value = jsonToData(c); + + mm.put((String) key, value); + + c.skipWhiteSpace(); + if (c.is(',')) { + c.move(); + c.skipWhiteSpace(); + } + } + + if (!c.end()) { + c.move(); // Skip the closing } + } + + return mm; + } + + public static String dataToJson(Object o, boolean escape) { + StringBuilder sb = new StringBuilder(); + str(sb, o, 0, false, escape); + return sb.toString(); + } + + public static String dataToJson(Object o) { + return dataToJson(o, true); + } + + @SuppressWarnings("unchecked") + private static void str(StringBuilder ss, Object o, int indent, boolean padFirst, boolean escape) { + if (o == null || o instanceof Boolean || o instanceof Number) { + if (padFirst) { + ss.append(pad(indent)); + } + ss.append(o); + return; + } + + if (o instanceof String) { + String s = escape ? escapeJson((String) o) : (String) o; + if (padFirst) { + ss.append(pad(indent)); + } + ss.append('"').append(s).append('"'); + return; + } + + if (o instanceof Number || o instanceof Boolean) { + if (padFirst) { + ss.append(pad(indent)); + } + ss.append(o); + return; + } + + if (o instanceof Map) { + Map<String, Object> mm = (Map<String, Object>) o; + + if (padFirst) { + ss.append(pad(indent)); + } + ss.append("{\n"); + + boolean first = true; + for (String k : mm.keySet()) { + if (!first) { + ss.append(",\n"); + } + first = false; + + Object v = mm.get(k); + ss.append(pad(indent + 1)); + if (escape) { + ss.append('"'); + } + ss.append(k); + if (escape) { + ss.append('"'); + } + ss.append(": "); + str(ss, v, indent + 1, false, escape); + } + + ss.append("\n"); + ss.append(pad(indent)).append('}'); + + return; + } + + if (o instanceof List) { + List<Object> ll = (List<Object>) o; + + if (padFirst) { + ss.append(pad(indent)); + } + ss.append("[\n"); + + boolean first = true; + for (Object o1 : ll) { + if (!first) { + ss.append(",\n"); + } + first = false; + + str(ss, o1, indent + 1, true, escape); + } + + ss.append("\n"); + ss.append(pad(indent)).append(']'); + } + } + + private static String escapeJson(String v) { + String s = v.replaceAll("\\\\", "\\\\\\\\"); + s = s.replaceAll("\"", "\\\\\""); + s = s.replaceAll("\n", "\\\\n"); + s = s.replaceAll("\r", "\\\\r"); + s = s.replaceAll("\t", "\\\\t"); + return s; + } + + private static String pad(int n) { + String s = ""; + for (int i = 0; i < n; i++) { + s += " "; + } + return s; + } +} diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueDataItem.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueDataItem.java new file mode 100644 index 000000000..32e94e5d3 --- /dev/null +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueDataItem.java @@ -0,0 +1,91 @@ +package org.onap.ccsdk.features.lib.doorman.it; + +import java.text.SimpleDateFormat; +import java.util.Date; +import org.onap.ccsdk.features.lib.doorman.data.MessageAction; +import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatus; + +public class MessageQueueDataItem implements Comparable<MessageQueueDataItem> { + + public Date timeStamp; + public String extMessageId; + public MessageStatus status; + public MessageAction action; + + @Override + public int compareTo(MessageQueueDataItem other) { + int c = timeStamp.compareTo(other.timeStamp); + if (c == 0) { + if (action != null && other.status != null) { + return -1; + } + if (status != null && other.action != null) { + return 1; + } + } + return c; + } + + @Override + public int hashCode() { + return System.identityHashCode(extMessageId); + } + + @Override + public boolean equals(Object o) { + if (o == null || !(o instanceof MessageQueueDataItem)) { + return false; + } + MessageQueueDataItem other = (MessageQueueDataItem) o; + if (!extMessageId.equals(other.extMessageId)) { + return false; + } + if (status != null && (other.status == null || status.status != other.status.status)) { + return false; + } + if (action != null) { + if (other.action == null || action.action != other.action.action) { + return false; + } + if (action.action == MessageActionValue.HOLD || action.action == MessageActionValue.RETURN_HOLD) { + if (action.holdTime != other.action.holdTime) { + return false; + } + } else if (action.action == MessageActionValue.RETURN_COMPLETE) { + if (!action.resolution.equals(other.action.resolution)) { + return false; + } + } + } + return true; + } + + @Override + public String toString() { + StringBuilder ss = new StringBuilder(); + if (timeStamp != null) { + ss.append(df.format(timeStamp)).append(" | "); + } + if (status != null) { + ss.append("STATUS: "); + } else { + ss.append("ACTION: "); + } + ss.append(String.format("%-20s | ", extMessageId)); + if (status != null) { + ss.append(status.status); + } else { + ss.append(action.action); + if (action.holdTime > 0) { + ss.append(" | ").append(action.holdTime); + } + if (action.resolution != null) { + ss.append(" | ").append(action.resolution); + } + } + return ss.toString(); + } + + private static SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); +} diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTest.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTest.java new file mode 100644 index 000000000..ca1c4910d --- /dev/null +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTest.java @@ -0,0 +1,222 @@ +package org.onap.ccsdk.features.lib.doorman.it; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.onap.ccsdk.features.lib.doorman.MessageClassifier; +import org.onap.ccsdk.features.lib.doorman.MessageInterceptor; +import org.onap.ccsdk.features.lib.doorman.MessageInterceptorFactory; +import org.onap.ccsdk.features.lib.doorman.MessageProcessor; +import org.onap.ccsdk.features.lib.doorman.MessageQueueHandler; +import org.onap.ccsdk.features.lib.doorman.dao.MessageDaoImpl; +import org.onap.ccsdk.features.lib.doorman.data.MessageData; +import org.onap.ccsdk.features.lib.doorman.data.Queue; +import org.onap.ccsdk.features.lib.doorman.impl.MessageInterceptorFactoryImpl; +import org.onap.ccsdk.features.lib.doorman.testutil.DbUtil; +import org.onap.ccsdk.features.lib.doorman.util.JsonUtil; +import org.onap.ccsdk.features.lib.rlock.LockHelperImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class MessageQueueTest { + + private static final Logger log = LoggerFactory.getLogger(MessageQueueTest.class); + + private String test; + + public MessageQueueTest(String test) { + this.test = test; + } + + @SuppressWarnings("unchecked") + @Test + public void test() throws Exception { + log.info("#########################################################################"); + log.info("MessageQueueTest: " + test + " started"); + + String testSetupJson = readResource("it/" + test + "/test.json"); + Map<String, Object> testSetup = (Map<String, Object>) JsonUtil.jsonToData(testSetupJson); + + Map<String, MessageQueueHandler> handlerMap = new HashMap<>(); + + for (Object o : (List<Object>) testSetup.get("handler_list")) { + Map<String, Object> handlerSetup = (Map<String, Object>) o; + String queueType = (String) handlerSetup.get("queue_type"); + String handlerClassName = (String) handlerSetup.get("handler_class"); + try { + Class<?> handlerClass = Class.forName("org.onap.ccsdk.features.lib.doorman.it." + test + "." + handlerClassName); + MessageQueueHandler handler = (MessageQueueHandler) handlerClass.newInstance(); + handlerMap.put(queueType, handler); + log.info("Handler found for queue type: " + queueType + ": " + handlerClass.getName()); + } catch (Exception e) { + } + } + + MessageInterceptorFactory factory = setupMessageInterceptorFactory(handlerMap, new Classifier(), new Processor()); + + List<Object> requestList = (List<Object>) testSetup.get("request_list"); + + List<Thread> threadList = new ArrayList<>(); + + for (int i = 0; i < requestList.size(); i++) { + Map<String, Object> requestSetup = (Map<String, Object>) requestList.get(i); + + MessageData request = new MessageData(); + request.param = (Map<String, Object>) requestSetup.get("request_param"); + Map<String, Object> requestBodyData = (Map<String, Object>) requestSetup.get("request_body"); + if (requestBodyData != null) { + request.body = JsonUtil.dataToJson(requestBodyData); + } else { + request.body = readResource("it/" + test + "/" + requestSetup.get("request_body_file")); + } + + MessageData response = new MessageData(); + response.param = (Map<String, Object>) requestSetup.get("response_param"); + Map<String, Object> responseBodyData = (Map<String, Object>) requestSetup.get("response_body"); + if (responseBodyData != null) { + response.body = JsonUtil.dataToJson(responseBodyData); + } else { + response.body = readResource("it/" + test + "/" + requestSetup.get("response_body_file")); + } + + long startTime = (Long) requestSetup.get("start_time"); + long processTime = (Long) requestSetup.get("process_time"); + + MessageInterceptor interceptor = factory.create(); + + Thread t = new Thread((Runnable) () -> { + try { + Thread.sleep(startTime); + } catch (InterruptedException e) { + } + + MessageData r = interceptor.processRequest(request); + + if (r == null) { + try { + Thread.sleep(processTime); + } catch (InterruptedException e) { + } + + interceptor.processResponse(response); + } + + }, "Message-" + i); + + threadList.add(t); + t.start(); + } + + for (Thread t : threadList) { + t.join(); + } + + log.info("MessageQueueTest: " + test + " completed"); + log.info("Result:"); + + String testResultJson = readResource("it/" + test + "/result.json"); + Map<String, Object> testResult = (Map<String, Object>) JsonUtil.jsonToData(testResultJson); + MessageQueueTestResult.checkResult(testResult); + } + + private static class Classifier implements MessageClassifier { + + @Override + public Queue determineQueue(MessageData request) { + Queue q = new Queue(); + q.type = (String) request.param.get("queue_type"); + q.id = (String) request.param.get("queue_id"); + return q; + } + + @Override + public String getExtMessageId(MessageData request) { + return (String) request.param.get("request_id"); + } + } + + private static class Processor implements MessageProcessor { + + @Override + public void processMessage(MessageData request) { + long processTime = (Long) request.param.get("process_time"); + try { + Thread.sleep(processTime); + } catch (InterruptedException e) { + } + } + } + + public static MessageInterceptorFactory setupMessageInterceptorFactory(Map<String, MessageQueueHandler> handlerMap, MessageClassifier classifier, MessageProcessor processor) { + LockHelperImpl lockHelper = new LockHelperImpl(); + lockHelper.setDataSource(DbUtil.getDataSource()); + lockHelper.setLockWait(5); + lockHelper.setRetryCount(10); + + MessageDaoImpl messageDao = new MessageDaoImpl(); + messageDao.setDataSource(DbUtil.getDataSource()); + + MessageInterceptorFactoryImpl f = new MessageInterceptorFactoryImpl(); + f.setMessageDao(messageDao); + f.setLockHelper(lockHelper); + f.setLockTimeout(20); + f.setHandlerMap(handlerMap); + f.setMessageClassifier(classifier); + f.setMessageProcessor(processor); + return f; + } + + @Parameters(name = "{0}") + public static Collection<Object[]> allTests() { + List<Object[]> ll = new ArrayList<>(); + + String[] tcList = list("it"); + for (String tc : tcList) { + ll.add(new Object[] { tc }); + } + return ll; + } + + private static String[] list(String dir) { + try { + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + URL url = loader.getResource(dir); + String path = url.getPath(); + return new File(path).list(); + } catch (Exception e) { + log.warn("Error getting directory list for: " + dir + ": " + e.getMessage(), e); + return new String[0]; + } + } + + private static String readResource(String resource) { + try { + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + InputStream ins = loader.getResourceAsStream(resource); + BufferedReader in = new BufferedReader(new InputStreamReader(ins)); + StringBuilder ss = new StringBuilder(); + String line = in.readLine(); + while (line != null) { + ss.append(line).append('\n'); + line = in.readLine(); + } + in.close(); + return ss.toString(); + } catch (Exception e) { + log.warn("Error reading resource: " + resource + ": " + e.getMessage(), e); + return null; + } + } +} diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTestResult.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTestResult.java new file mode 100644 index 000000000..71a2eeafb --- /dev/null +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/MessageQueueTestResult.java @@ -0,0 +1,120 @@ +package org.onap.ccsdk.features.lib.doorman.it; + +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.onap.ccsdk.features.lib.doorman.dao.MessageDaoImpl; +import org.onap.ccsdk.features.lib.doorman.data.Message; +import org.onap.ccsdk.features.lib.doorman.data.MessageAction; +import org.onap.ccsdk.features.lib.doorman.data.MessageActionValue; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatus; +import org.onap.ccsdk.features.lib.doorman.data.MessageStatusValue; +import org.onap.ccsdk.features.lib.doorman.data.Queue; +import org.onap.ccsdk.features.lib.doorman.testutil.DbUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageQueueTestResult { + + private static final Logger log = LoggerFactory.getLogger(MessageQueueTest.class); + + public static List<MessageQueueDataItem> readResult(String queueType, String queueId) { + MessageDaoImpl messageDao = new MessageDaoImpl(); + messageDao.setDataSource(DbUtil.getDataSource()); + + Queue q = new Queue(); + q.type = queueType; + q.id = queueId; + + List<Message> messageList = messageDao.readMessageQueue(q); + + List<MessageQueueDataItem> ll = new ArrayList<>(); + if (messageList != null) { + for (Message m : messageList) { + if (m.statusHistory != null) { + for (MessageStatus s : m.statusHistory) { + MessageQueueDataItem item = new MessageQueueDataItem(); + item.extMessageId = m.extMessageId; + item.status = s; + item.timeStamp = s.timestamp; + ll.add(item); + } + } + if (m.actionHistory != null) { + for (MessageAction a : m.actionHistory) { + MessageQueueDataItem item = new MessageQueueDataItem(); + item.extMessageId = m.extMessageId; + item.action = a; + item.timeStamp = a.timestamp; + ll.add(item); + } + } + } + } + Collections.sort(ll); + return ll; + } + + @SuppressWarnings("unchecked") + public static void checkResult(Map<String, Object> testResult) { + List<Object> resultSetupList = (List<Object>) testResult.get("queue_list"); + if (resultSetupList != null) { + for (Object o : resultSetupList) { + Map<String, Object> resultSetup = (Map<String, Object>) o; + String queueType = (String) resultSetup.get("queue_type"); + String queueId = (String) resultSetup.get("queue_id"); + log.info(queueType + "::" + queueId); + List<MessageQueueDataItem> itemList = MessageQueueTestResult.readResult(queueType, queueId); + for (MessageQueueDataItem item : itemList) { + log.info(" --- " + item); + } + + List<Object> checkSequenceList = (List<Object>) resultSetup.get("check_sequence_list"); + for (int i = 0; i < checkSequenceList.size(); i++) { + List<Object> checkSequence = (List<Object>) checkSequenceList.get(i); + List<MessageQueueDataItem> checkItemList = new ArrayList<>(); + for (Object o2 : checkSequence) { + String[] ss = ((String) o2).split("\\|"); + MessageQueueDataItem item = new MessageQueueDataItem(); + item.extMessageId = ss[1].trim(); + if (ss[0].trim().equals("STATUS")) { + item.status = new MessageStatus(); + item.status.status = MessageStatusValue.valueOf(ss[2].trim()); + } else { + item.action = new MessageAction(); + item.action.action = MessageActionValue.valueOf(ss[2].trim()); + if (item.action.action == MessageActionValue.HOLD || item.action.action == MessageActionValue.RETURN_HOLD) { + item.action.holdTime = Integer.parseInt(ss[3].trim()); + } else if (item.action.action == MessageActionValue.RETURN_COMPLETE) { + item.action.resolution = ss[3].trim(); + } + } + checkItemList.add(item); + } + List<MessageQueueDataItem> itemList1 = new ArrayList<>(itemList); + itemList1.retainAll(checkItemList); + if (!itemList1.equals(checkItemList)) { + log.info("Expected sequence #" + i + " not found"); + log.info("Expected sequence:"); + for (MessageQueueDataItem item : checkItemList) { + log.info(" --- " + item); + } + log.info("Found sequence:"); + for (MessageQueueDataItem item : itemList1) { + log.info(" --- " + item); + } + fail("Expected sequence #" + i + " not found"); + } else { + log.info("Expected sequence #" + i + " found in the result:"); + for (MessageQueueDataItem item : checkItemList) { + log.info(" --- " + item); + } + } + } + } + } + } +} diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Classifier.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Classifier.java new file mode 100644 index 000000000..254fd4ce4 --- /dev/null +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Classifier.java @@ -0,0 +1,21 @@ +package org.onap.ccsdk.features.lib.doorman.it.tc1; + +import org.onap.ccsdk.features.lib.doorman.MessageClassifier; +import org.onap.ccsdk.features.lib.doorman.data.MessageData; +import org.onap.ccsdk.features.lib.doorman.data.Queue; + +public class Classifier implements MessageClassifier { + + @Override + public Queue determineQueue(MessageData request) { + Queue q = new Queue(); + q.type = "Cluster"; + q.id = "test-queue"; + return q; + } + + @Override + public String getExtMessageId(MessageData request) { + return (String) request.param.get("request_id"); + } +} diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Handler.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Handler.java new file mode 100644 index 000000000..f57fdbc36 --- /dev/null +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc1/Handler.java @@ -0,0 +1,10 @@ +package org.onap.ccsdk.features.lib.doorman.it.tc1; + +import org.onap.ccsdk.features.lib.doorman.impl.MessageHandlerBaseImpl; + +public class Handler extends MessageHandlerBaseImpl { + + public Handler() { + + } +} diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Classifier.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Classifier.java new file mode 100644 index 000000000..a32f74650 --- /dev/null +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Classifier.java @@ -0,0 +1,21 @@ +package org.onap.ccsdk.features.lib.doorman.it.tc2; + +import org.onap.ccsdk.features.lib.doorman.MessageClassifier; +import org.onap.ccsdk.features.lib.doorman.data.MessageData; +import org.onap.ccsdk.features.lib.doorman.data.Queue; + +public class Classifier implements MessageClassifier { + + @Override + public Queue determineQueue(MessageData request) { + Queue q = new Queue(); + q.type = "Cluster"; + q.id = "test-queue"; + return q; + } + + @Override + public String getExtMessageId(MessageData request) { + return (String) request.param.get("request_id"); + } +} diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Handler.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Handler.java new file mode 100644 index 000000000..35b811c5f --- /dev/null +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/it/tc2/Handler.java @@ -0,0 +1,53 @@ +package org.onap.ccsdk.features.lib.doorman.it.tc2; + +import java.util.HashMap; +import java.util.Map; +import org.onap.ccsdk.features.lib.doorman.data.Message; +import org.onap.ccsdk.features.lib.doorman.data.MessageData; +import org.onap.ccsdk.features.lib.doorman.impl.MessageHandlerBaseImpl; +import org.onap.ccsdk.features.lib.doorman.util.JsonUtil; + +public class Handler extends MessageHandlerBaseImpl { + + public Handler() { + setMaxParallelCount(100); + setUpdateWaitTime(20); + } + + @SuppressWarnings("unchecked") + @Override + protected String determineUpdateGroup(Message msg) { + if (msg.request.body != null) { + Map<String, Object> body = (Map<String, Object>) JsonUtil.jsonToData(msg.request.body); + String op = (String) body.get("operation"); + String entityId = (String) body.get("entity_id"); + if ("update".equals(op)) { + return entityId; + } + } + return super.determineUpdateGroup(msg); + } + + @Override + protected long determineUpdateSequence(Message msg) { + if (msg.request.param != null) { + Long n = (Long) msg.request.param.get("sequence_number"); + if (n != null) { + return n; + } + } + return super.determineUpdateSequence(msg); + } + + @Override + protected MessageData completeResponse(Resolution r, Message msg) { + if (r == Resolution.SKIPPED) { + MessageData response = new MessageData(); + response.param = new HashMap<>(); + response.param.put("http_code", 200L); + response.body = "{ \"status\": \"success\" }"; + return response; + } + return super.completeResponse(r, msg); + } +} diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/DbUtil.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/DbUtil.java new file mode 100644 index 000000000..1e525b4b2 --- /dev/null +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/DbUtil.java @@ -0,0 +1,92 @@ +package org.onap.ccsdk.features.lib.doorman.testutil; + +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; + +import javax.sql.DataSource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DbUtil { + + private static final Logger log = LoggerFactory.getLogger(DbUtil.class); + + private static DataSource dataSource = null; + + public static synchronized DataSource getDataSource() { + if (dataSource == null) { + String url = "jdbc:h2:mem:app;DB_CLOSE_DELAY=-1"; + + dataSource = new DataSource() { + + @Override + public <T> T unwrap(Class<T> arg0) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class<?> arg0) throws SQLException { + return false; + } + + @Override + public void setLoginTimeout(int arg0) throws SQLException { + } + + @Override + public void setLogWriter(PrintWriter arg0) throws SQLException { + } + + @Override + public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { + return null; + } + + @Override + public int getLoginTimeout() throws SQLException { + return 0; + } + + @Override + public PrintWriter getLogWriter() throws SQLException { + return null; + } + + @Override + public Connection getConnection(String username, String password) throws SQLException { + return null; + } + + @Override + public Connection getConnection() throws SQLException { + return DriverManager.getConnection(url); + } + }; + + try { + String script = FileUtil.read("/schema.sql"); + + String[] sqlList = script.split(";"); + try (Connection con = dataSource.getConnection()) { + for (String sql : sqlList) { + if (!sql.trim().isEmpty()) { + sql = sql.trim(); + try (PreparedStatement ps = con.prepareStatement(sql)) { + log.info("Executing statement:\n" + sql); + ps.execute(); + } + } + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return dataSource; + } +} diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/FileUtil.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/FileUtil.java new file mode 100644 index 000000000..5ae92b4fc --- /dev/null +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/testutil/FileUtil.java @@ -0,0 +1,24 @@ +package org.onap.ccsdk.features.lib.doorman.testutil; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; + +public class FileUtil { + + public static String read(String fileName) throws Exception { + String ss = ""; + try (InputStream is = DbUtil.class.getResourceAsStream(fileName)) { + try (InputStreamReader isr = new InputStreamReader(is)) { + try (BufferedReader in = new BufferedReader(isr)) { + String s = in.readLine(); + while (s != null) { + ss += s + '\n'; + s = in.readLine(); + } + } + } + } + return ss; + } +} diff --git a/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/util/TestJsonUtil.java b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/util/TestJsonUtil.java new file mode 100644 index 000000000..f0ac87e24 --- /dev/null +++ b/lib/doorman/src/test/java/org/onap/ccsdk/features/lib/doorman/util/TestJsonUtil.java @@ -0,0 +1,26 @@ +package org.onap.ccsdk.features.lib.doorman.util; + +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; +import org.onap.ccsdk.features.lib.doorman.testutil.FileUtil; +import org.onap.ccsdk.features.lib.doorman.util.JsonUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestJsonUtil { + + private static final Logger log = LoggerFactory.getLogger(TestJsonUtil.class); + + @SuppressWarnings("unchecked") + @Test + public void testJsonConversion() throws Exception { + String json1 = FileUtil.read("/test1.json"); + Map<String, Object> data1 = (Map<String, Object>) JsonUtil.jsonToData(json1); + String convJson1 = JsonUtil.dataToJson(data1); + log.info("Converted JSON:\n" + convJson1); + Map<String, Object> convData1 = (Map<String, Object>) JsonUtil.jsonToData(convJson1); + Assert.assertEquals(data1, convData1); + } +} diff --git a/lib/doorman/src/test/resources/it/tc1/result.json b/lib/doorman/src/test/resources/it/tc1/result.json new file mode 100644 index 000000000..3b048827c --- /dev/null +++ b/lib/doorman/src/test/resources/it/tc1/result.json @@ -0,0 +1,31 @@ +{ + "queue_list": [ + { + "queue_type": "QQ", + "queue_id": "ALL", + "check_sequence_list": [ + [ + "STATUS | req-1 | ARRIVED", + "ACTION | req-1 | PROCESS", + "STATUS | req-1 | PROCESSING_SYNC", + "STATUS | req-2 | ARRIVED", + "ACTION | req-2 | HOLD | 60", + "STATUS | req-2 | IN_QUEUE", + "STATUS | req-3 | ARRIVED", + "ACTION | req-3 | HOLD | 60", + "STATUS | req-3 | IN_QUEUE", + "ACTION | req-1 | RETURN_COMPLETE | PROCESSED", + "ACTION | req-2 | PROCESS", + "STATUS | req-1 | COMPLETED", + "STATUS | req-2 | PROCESSING_SYNC", + "ACTION | req-2 | RETURN_COMPLETE | PROCESSED", + "ACTION | req-3 | PROCESS", + "STATUS | req-2 | COMPLETED", + "STATUS | req-3 | PROCESSING_SYNC", + "ACTION | req-3 | RETURN_COMPLETE | PROCESSED", + "STATUS | req-3 | COMPLETED" + ] + ] + } + ] +} diff --git a/lib/doorman/src/test/resources/it/tc1/test.json b/lib/doorman/src/test/resources/it/tc1/test.json new file mode 100644 index 000000000..0d96ce8dd --- /dev/null +++ b/lib/doorman/src/test/resources/it/tc1/test.json @@ -0,0 +1,67 @@ +{ + "handler_list": [ + { + "queue_type": "QQ", + "handler_class": "Handler" + } + ], + "request_list": [ + { + "request_param": { + "request_id": "req-1", + "queue_type": "QQ", + "queue_id": "ALL", + "http_method": "POST" + }, + "request_body": { + "siid": "test-siid-1" + }, + "response_param": { + "http_code": 200 + }, + "response_body": { + "status": "success" + }, + "start_time": 100, + "process_time": 5000 + }, + { + "request_param": { + "request_id": "req-2", + "queue_type": "QQ", + "queue_id": "ALL", + "http_method": "POST" + }, + "request_body": { + "siid": "test-siid-2" + }, + "response_param": { + "http_code": 200 + }, + "response_body": { + "status": "success" + }, + "start_time": 500, + "process_time": 5000 + }, + { + "request_param": { + "request_id": "req-3", + "queue_type": "QQ", + "queue_id": "ALL", + "http_method": "POST" + }, + "request_body": { + "siid": "test-siid-3" + }, + "response_param": { + "http_code": 200 + }, + "response_body": { + "status": "success" + }, + "start_time": 900, + "process_time": 5000 + } + ] +} diff --git a/lib/doorman/src/test/resources/it/tc2/request_entity1.txt b/lib/doorman/src/test/resources/it/tc2/request_entity1.txt new file mode 100644 index 000000000..abda4f381 --- /dev/null +++ b/lib/doorman/src/test/resources/it/tc2/request_entity1.txt @@ -0,0 +1,4 @@ +{ + "operation": "update", + "entity_id": "entity1" +} diff --git a/lib/doorman/src/test/resources/it/tc2/request_entity2.txt b/lib/doorman/src/test/resources/it/tc2/request_entity2.txt new file mode 100644 index 000000000..eb6ef6d92 --- /dev/null +++ b/lib/doorman/src/test/resources/it/tc2/request_entity2.txt @@ -0,0 +1,4 @@ +{ + "operation": "update", + "entity_id": "entity2" +} diff --git a/lib/doorman/src/test/resources/it/tc2/request_other.txt b/lib/doorman/src/test/resources/it/tc2/request_other.txt new file mode 100644 index 000000000..7763aeb7d --- /dev/null +++ b/lib/doorman/src/test/resources/it/tc2/request_other.txt @@ -0,0 +1,4 @@ +{ + "operation": "add", + "entity_id": "entity5" +} diff --git a/lib/doorman/src/test/resources/it/tc2/response.txt b/lib/doorman/src/test/resources/it/tc2/response.txt new file mode 100644 index 000000000..21aad5aed --- /dev/null +++ b/lib/doorman/src/test/resources/it/tc2/response.txt @@ -0,0 +1,3 @@ +{ + "status": "success", +} diff --git a/lib/doorman/src/test/resources/it/tc2/result.json b/lib/doorman/src/test/resources/it/tc2/result.json new file mode 100644 index 000000000..d2df2ea7b --- /dev/null +++ b/lib/doorman/src/test/resources/it/tc2/result.json @@ -0,0 +1,94 @@ +{ + "queue_list": [ + { + "queue_type": "Cluster", + "queue_id": "Cluster", + "check_sequence_list": [ + [ + "STATUS | req-entity1-1 | ARRIVED", + "ACTION | req-entity1-1 | HOLD | 20", + "STATUS | req-entity1-1 | IN_QUEUE", + "STATUS | req-entity1-2 | ARRIVED", + "ACTION | req-entity1-1 | RETURN_COMPLETE | SKIPPED", + "ACTION | req-entity1-2 | HOLD | 20", + "STATUS | req-entity1-2 | IN_QUEUE", + "STATUS | req-entity1-3 | ARRIVED", + "ACTION | req-entity1-2 | RETURN_COMPLETE | SKIPPED", + "ACTION | req-entity1-3 | HOLD | 20", + "STATUS | req-entity1-3 | IN_QUEUE", + "STATUS | req-entity1-4 | ARRIVED", + "ACTION | req-entity1-3 | RETURN_COMPLETE | SKIPPED", + "ACTION | req-entity1-4 | HOLD | 20", + "STATUS | req-entity1-4 | IN_QUEUE", + "ACTION | req-entity1-4 | PROCESS", + "STATUS | req-entity1-4 | PROCESSING_SYNC", + "ACTION | req-entity1-4 | RETURN_COMPLETE | PROCESSED", + "STATUS | req-entity1-4 | COMPLETED" + ], + [ + "STATUS | req-entity2-1 | ARRIVED", + "ACTION | req-entity2-1 | HOLD | 20", + "STATUS | req-entity2-1 | IN_QUEUE", + "STATUS | req-entity2-2 | ARRIVED", + "ACTION | req-entity2-1 | RETURN_COMPLETE | SKIPPED", + "ACTION | req-entity2-2 | HOLD | 20", + "STATUS | req-entity2-2 | IN_QUEUE", + "STATUS | req-entity2-3 | ARRIVED", + "ACTION | req-entity2-2 | RETURN_COMPLETE | SKIPPED", + "ACTION | req-entity2-3 | HOLD | 20", + "STATUS | req-entity2-3 | IN_QUEUE", + "STATUS | req-entity2-4 | ARRIVED", + "ACTION | req-entity2-3 | RETURN_COMPLETE | SKIPPED", + "ACTION | req-entity2-4 | HOLD | 20", + "STATUS | req-entity2-4 | IN_QUEUE", + "ACTION | req-entity2-4 | PROCESS", + "STATUS | req-entity2-4 | PROCESSING_SYNC", + "ACTION | req-entity2-4 | RETURN_COMPLETE | PROCESSED", + "STATUS | req-entity2-4 | COMPLETED" + ], + [ + "STATUS | req-entity1-1 | ARRIVED", + "ACTION | req-entity1-1 | HOLD | 20", + "STATUS | req-entity1-1 | IN_QUEUE", + "ACTION | req-entity1-1 | RETURN_COMPLETE | SKIPPED", + "STATUS | req-entity1-1 | COMPLETED" + ], + [ + "STATUS | req-entity1-2 | ARRIVED", + "ACTION | req-entity1-2 | HOLD | 20", + "STATUS | req-entity1-2 | IN_QUEUE", + "ACTION | req-entity1-2 | RETURN_COMPLETE | SKIPPED", + "STATUS | req-entity1-2 | COMPLETED" + ], + [ + "STATUS | req-entity1-3 | ARRIVED", + "ACTION | req-entity1-3 | HOLD | 20", + "STATUS | req-entity1-3 | IN_QUEUE", + "ACTION | req-entity1-3 | RETURN_COMPLETE | SKIPPED", + "STATUS | req-entity1-3 | COMPLETED" + ], + [ + "STATUS | req-entity2-1 | ARRIVED", + "ACTION | req-entity2-1 | HOLD | 20", + "STATUS | req-entity2-1 | IN_QUEUE", + "ACTION | req-entity2-1 | RETURN_COMPLETE | SKIPPED", + "STATUS | req-entity2-1 | COMPLETED" + ], + [ + "STATUS | req-entity2-2 | ARRIVED", + "ACTION | req-entity2-2 | HOLD | 20", + "STATUS | req-entity2-2 | IN_QUEUE", + "ACTION | req-entity2-2 | RETURN_COMPLETE | SKIPPED", + "STATUS | req-entity2-2 | COMPLETED" + ], + [ + "STATUS | req-entity2-3 | ARRIVED", + "ACTION | req-entity2-3 | HOLD | 20", + "STATUS | req-entity2-3 | IN_QUEUE", + "ACTION | req-entity2-3 | RETURN_COMPLETE | SKIPPED", + "STATUS | req-entity2-3 | COMPLETED" + ] + ] + } + ] +} diff --git a/lib/doorman/src/test/resources/it/tc2/result2.json b/lib/doorman/src/test/resources/it/tc2/result2.json new file mode 100644 index 000000000..551f1ae17 --- /dev/null +++ b/lib/doorman/src/test/resources/it/tc2/result2.json @@ -0,0 +1,76 @@ +{ + "queue_list": [ + { + "queue_type": "Cluster", + "queue_id": "Cluster", + "check_sequence_list": [ + [ + "STATUS | req-entity1-1 | ARRIVED", + "ACTION | req-entity1-1 | HOLD | 20", + "STATUS | req-entity1-1 | IN_QUEUE", + "STATUS | req-other-1 | ARRIVED", + "ACTION | req-other-1 | PROCESS", + "STATUS | req-other-1 | PROCESSING_SYNC", + "STATUS | req-entity2-1 | ARRIVED", + "ACTION | req-entity2-1 | HOLD | 20", + "STATUS | req-entity2-1 | IN_QUEUE", + "STATUS | req-other-2 | ARRIVED", + "ACTION | req-other-2 | PROCESS", + "STATUS | req-other-2 | PROCESSING_SYNC", + "STATUS | req-entity1-2 | ARRIVED", + "ACTION | req-entity1-1 | RETURN_COMPLETE | SKIPPED", + "ACTION | req-entity1-2 | HOLD | 20", + "STATUS | req-entity1-2 | IN_QUEUE", + "STATUS | req-entity2-2 | ARRIVED", + "ACTION | req-entity2-1 | RETURN_COMPLETE | SKIPPED", + "ACTION | req-entity2-2 | HOLD | 20", + "STATUS | req-entity2-2 | IN_QUEUE", + "STATUS | req-entity1-3 | ARRIVED", + "ACTION | req-entity1-2 | RETURN_COMPLETE | SKIPPED", + "ACTION | req-entity1-3 | HOLD | 20", + "STATUS | req-entity1-3 | IN_QUEUE", + "STATUS | req-other-3 | ARRIVED", + "ACTION | req-other-3 | PROCESS", + "STATUS | req-other-3 | PROCESSING_SYNC", + "STATUS | req-entity2-3 | ARRIVED", + "ACTION | req-entity2-2 | RETURN_COMPLETE | SKIPPED", + "ACTION | req-entity2-3 | HOLD | 20", + "STATUS | req-entity2-3 | IN_QUEUE", + "STATUS | req-entity1-4 | ARRIVED", + "ACTION | req-entity1-3 | RETURN_COMPLETE | SKIPPED", + "ACTION | req-entity1-4 | HOLD | 20", + "STATUS | req-entity1-4 | IN_QUEUE", + "STATUS | req-entity2-4 | ARRIVED", + "ACTION | req-entity2-3 | RETURN_COMPLETE | SKIPPED", + "ACTION | req-entity2-4 | HOLD | 20", + "STATUS | req-entity2-4 | IN_QUEUE", + "STATUS | req-other-4 | ARRIVED", + "ACTION | req-other-4 | PROCESS", + "STATUS | req-other-4 | PROCESSING_SYNC", + "STATUS | req-entity1-1 | COMPLETED", + "ACTION | req-other-1 | RETURN_COMPLETE | PROCESSED", + "STATUS | req-other-1 | COMPLETED", + "STATUS | req-entity2-1 | COMPLETED", + "ACTION | req-other-2 | RETURN_COMPLETE | PROCESSED", + "STATUS | req-other-2 | COMPLETED", + "STATUS | req-entity1-2 | COMPLETED", + "STATUS | req-entity2-2 | COMPLETED", + "STATUS | req-entity1-3 | COMPLETED", + "ACTION | req-other-3 | RETURN_COMPLETE | PROCESSED", + "STATUS | req-other-3 | COMPLETED", + "STATUS | req-entity2-3 | COMPLETED", + "ACTION | req-other-4 | RETURN_COMPLETE | PROCESSED", + "STATUS | req-other-4 | COMPLETED", + "ACTION | req-entity1-4 | PROCESS", + "ACTION | req-entity2-4 | PROCESS", + "STATUS | req-entity1-4 | PROCESSING_SYNC", + "STATUS | req-entity2-4 | PROCESSING_SYNC", + "ACTION | req-entity1-4 | RETURN_COMPLETE | PROCESSED", + "STATUS | req-entity1-4 | COMPLETED", + "ACTION | req-entity2-4 | RETURN_COMPLETE | PROCESSED", + "STATUS | req-entity2-4 | COMPLETED" + ] + ] + } + ] +} diff --git a/lib/doorman/src/test/resources/it/tc2/test.json b/lib/doorman/src/test/resources/it/tc2/test.json new file mode 100644 index 000000000..82a0546ac --- /dev/null +++ b/lib/doorman/src/test/resources/it/tc2/test.json @@ -0,0 +1,202 @@ +{ + "handler_list": [ + { + "queue_type": "Cluster", + "handler_class": "Handler" + } + ], + "request_list": [ + { + "request_param": { + "request_id": "req-entity1-1", + "sequence_number": 1, + "queue_type": "Cluster", + "queue_id": "Cluster", + "http_method": "POST" + }, + "request_body_file": "request_entity1.txt", + "response_param": { + "http_code": 200 + }, + "response_body_file": "response.txt", + "start_time": 100, + "process_time": 5000 + }, + { + "request_param": { + "request_id": "req-other-1", + "sequence_number": 2, + "queue_type": "Cluster", + "queue_id": "Cluster", + "http_method": "POST" + }, + "request_body_file": "request_other.txt", + "response_param": { + "http_code": 200 + }, + "response_body_file": "response.txt", + "start_time": 200, + "process_time": 5000 + }, + { + "request_param": { + "request_id": "req-entity2-1", + "sequence_number": 3, + "queue_type": "Cluster", + "queue_id": "Cluster", + "http_method": "POST" + }, + "request_body_file": "request_entity2.txt", + "response_param": { + "http_code": 200 + }, + "response_body_file": "response.txt", + "start_time": 300, + "process_time": 5000 + }, + { + "request_param": { + "request_id": "req-other-2", + "sequence_number": 4, + "queue_type": "Cluster", + "queue_id": "Cluster", + "http_method": "POST" + }, + "request_body_file": "request_other.txt", + "response_param": { + "http_code": 200 + }, + "response_body_file": "response.txt", + "start_time": 400, + "process_time": 5000 + }, + { + "request_param": { + "request_id": "req-entity1-2", + "sequence_number": 5, + "queue_type": "Cluster", + "queue_id": "Cluster", + "http_method": "POST" + }, + "request_body_file": "request_entity1.txt", + "response_param": { + "http_code": 200 + }, + "response_body_file": "response.txt", + "start_time": 500, + "process_time": 5000 + }, + { + "request_param": { + "request_id": "req-entity2-2", + "sequence_number": 6, + "queue_type": "Cluster", + "queue_id": "Cluster", + "http_method": "POST" + }, + "request_body_file": "request_entity2.txt", + "response_param": { + "http_code": 200 + }, + "response_body_file": "response.txt", + "start_time": 600, + "process_time": 5000 + }, + { + "request_param": { + "request_id": "req-entity1-3", + "sequence_number": 7, + "queue_type": "Cluster", + "queue_id": "Cluster", + "http_method": "POST" + }, + "request_body_file": "request_entity1.txt", + "response_param": { + "http_code": 200 + }, + "response_body_file": "response.txt", + "start_time": 700, + "process_time": 5000 + }, + { + "request_param": { + "request_id": "req-other-3", + "sequence_number": 8, + "queue_type": "Cluster", + "queue_id": "Cluster", + "http_method": "POST" + }, + "request_body_file": "request_other.txt", + "response_param": { + "http_code": 200 + }, + "response_body_file": "response.txt", + "start_time": 800, + "process_time": 5000 + }, + { + "request_param": { + "request_id": "req-entity2-3", + "sequence_number": 9, + "queue_type": "Cluster", + "queue_id": "Cluster", + "http_method": "POST" + }, + "request_body_file": "request_entity2.txt", + "response_param": { + "http_code": 200 + }, + "response_body_file": "response.txt", + "start_time": 900, + "process_time": 5000 + }, + { + "request_param": { + "request_id": "req-entity1-4", + "sequence_number": 10, + "queue_type": "Cluster", + "queue_id": "Cluster", + "http_method": "POST" + }, + "request_body_file": "request_entity1.txt", + "response_param": { + "http_code": 200 + }, + "response_body_file": "response.txt", + "start_time": 3100, + "process_time": 5000 + }, + { + "request_param": { + "request_id": "req-entity2-4", + "sequence_number": 11, + "queue_type": "Cluster", + "queue_id": "Cluster", + "http_method": "POST" + }, + "request_body_file": "request_entity2.txt", + "response_param": { + "http_code": 200 + }, + "response_body_file": "response.txt", + "start_time": 3200, + "process_time": 5000 + }, + { + "request_param": { + "request_id": "req-other-4", + "sequence_number": 12, + "queue_type": "Cluster", + "queue_id": "Cluster", + "http_method": "POST" + }, + "request_body_file": "request_other.txt", + "response_param": { + "http_code": 200 + }, + "response_body_file": "response.txt", + "start_time": 3300, + "process_time": 5000 + } + ] +} diff --git a/lib/doorman/src/test/resources/schema.sql b/lib/doorman/src/test/resources/schema.sql new file mode 100644 index 000000000..d110e64ca --- /dev/null +++ b/lib/doorman/src/test/resources/schema.sql @@ -0,0 +1,58 @@ +CREATE TABLE IF NOT EXISTS `resource_lock` ( + `resource_lock_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `resource_name` varchar(256), + `lock_holder` varchar(100) NOT NULL, + `lock_count` smallint(6) NOT NULL, + `lock_time` datetime NOT NULL, + `expiration_time` datetime NOT NULL, + PRIMARY KEY (`resource_lock_id`), + UNIQUE KEY `IX1_RESOURCE_LOCK` (`resource_name`) +); + +CREATE TABLE IF NOT EXISTS `message` ( + `message_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `ext_message_id` varchar(64), + `request_param` TEXT NOT NULL, + `request_body` TEXT NOT NULL, + `arrived_timestamp` datetime NOT NULL, + `started_timestamp` datetime, + `completed_timestamp` datetime, + `response_timestamp` datetime, + `response_param` TEXT, + `response_body` TEXT, + `resolution` varchar(20), + `queue_type` varchar(64), + `queue_id` varchar(256), + PRIMARY KEY (`message_id`) +); + +CREATE INDEX IF NOT EXISTS `ix1_message` +ON message(queue_type, queue_id); + +CREATE TABLE IF NOT EXISTS `message_status` ( + `message_status_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `message_id` bigint(20) unsigned NOT NULL REFERENCES message(message_id), + `status` varchar(20) NOT NULL, + `status_timestamp` datetime NOT NULL, + PRIMARY KEY (`message_status_id`) +); + +CREATE INDEX IF NOT EXISTS `ix1_message_status` +ON message_status(message_id); + +CREATE TABLE IF NOT EXISTS `message_action` ( + `message_action_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `message_id` bigint(20) unsigned NOT NULL REFERENCES message(message_id), + `action` varchar(20) NOT NULL, + `action_status` varchar(20) NOT NULL, + `resolution` varchar(20), + `action_timestamp` datetime NOT NULL, + `done_timestamp` datetime, + `hold_time` int, + `response_param` TEXT, + `response_body` TEXT, + PRIMARY KEY (`message_action_id`) +); + +CREATE INDEX IF NOT EXISTS `ix1_message_action` +ON message_action(message_id); diff --git a/lib/doorman/src/test/resources/test1.json b/lib/doorman/src/test/resources/test1.json new file mode 100644 index 000000000..820675c7e --- /dev/null +++ b/lib/doorman/src/test/resources/test1.json @@ -0,0 +1,36 @@ +{ + "struct": { + "simple-list": [ + "val1", + "val2", + null, + false, + true, + 123, + 0.123, + 1.2334e-5 + ], + "list": [ + { + "name1": "val1", + "name2": "val2" + }, + { + "name1": "val1", + "name2": "val2" + }, + { + "integer": 34, + "fraction": 0.2145, + "exponent": 6.61789e+0, + "boolean": true + }, + [ "val1", "val2", "blah \"string\" \t\t\t\n" ] ] + }, + "types": { + "integer": 34, + "fraction": 0.2145, + "exponent": 6.61789e+0, + "boolean": true + } +} diff --git a/lib/rlock/pom.xml b/lib/rlock/pom.xml new file mode 100644 index 000000000..51b778bad --- /dev/null +++ b/lib/rlock/pom.xml @@ -0,0 +1,43 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.onap.ccsdk.features</groupId> + <artifactId>ccsdk-features</artifactId> + <version>1.0.0-SNAPSHOT</version> + </parent> + + <groupId>org.onap.ccsdk.features.lib.rlock</groupId> + <artifactId>rlock</artifactId> + + <description>Resource Lock - Distributed locking feature using database</description> + + <dependencies> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java new file mode 100644 index 000000000..17817fe1d --- /dev/null +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelper.java @@ -0,0 +1,14 @@ +package org.onap.ccsdk.features.lib.rlock; + +import java.util.Collection; + +public interface LockHelper { + + void lock(String resourceName, String lockRequester, int lockTimeout /* Seconds */); + + void unlock(String resourceName, boolean force); + + void lock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */); + + void unlock(Collection<String> resourceNameList, boolean force); +} diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java new file mode 100644 index 000000000..666fb6af5 --- /dev/null +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/LockHelperImpl.java @@ -0,0 +1,173 @@ +package org.onap.ccsdk.features.lib.rlock; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +import javax.sql.DataSource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LockHelperImpl implements LockHelper { + + private static final Logger log = LoggerFactory.getLogger(LockHelperImpl.class); + + private int retryCount = 20; + private int lockWait = 5; // Seconds + + private DataSource dataSource; + + @Override + public void lock(String resourceName, String lockRequester, int lockTimeout /* Seconds */) { + lock(Collections.singleton(resourceName), lockRequester, lockTimeout); + } + + @Override + public void unlock(String resourceName, boolean force) { + unlock(Collections.singleton(resourceName), force); + } + + @Override + public void lock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */) { + for (int i = 0; true; i++) { + try { + tryLock(resourceNameList, lockRequester, lockTimeout); + log.info("Resources locked: " + resourceNameList); + return; + } catch (ResourceLockedException e) { + if (i > retryCount) { + throw e; + } + try { + Thread.sleep(lockWait * 1000L); + } catch (InterruptedException ex) { + } + } + } + } + + @Override + public void unlock(Collection<String> lockNames, boolean force) { + if (lockNames == null || lockNames.size() == 0) { + return; + } + + try (ResourceLockDao resourceLockDao = new ResourceLockDao(dataSource)) { + try { + for (String name : lockNames) { + ResourceLock l = resourceLockDao.getByResourceName(name); + if (l != null) { + if (force || l.lockCount == 1) { + resourceLockDao.delete(l.id); + } else { + resourceLockDao.decrementLockCount(l.id); + } + } + } + resourceLockDao.commit(); + log.info("Resources unlocked: " + lockNames); + } catch (Exception e) { + resourceLockDao.rollback(); + } + } + } + + public void tryLock(Collection<String> resourceNameList, String lockRequester, int lockTimeout /* Seconds */) { + if (resourceNameList == null || resourceNameList.isEmpty()) { + return; + } + + lockRequester = generateLockRequester(lockRequester, 100); + + // First check if all requested records are available to lock + + Date now = new Date(); + + try (ResourceLockDao resourceLockDao = new ResourceLockDao(dataSource)) { + try { + List<ResourceLock> dbLockList = new ArrayList<>(); + List<String> insertLockNameList = new ArrayList<>(); + for (String name : resourceNameList) { + ResourceLock l = resourceLockDao.getByResourceName(name); + + boolean canLock = l == null || now.getTime() > l.expirationTime.getTime() || lockRequester != null && lockRequester.equals(l.lockHolder) || l.lockCount <= 0; + if (!canLock) { + throw new ResourceLockedException(l.resourceName, l.lockHolder, lockRequester); + } + + if (l != null) { + if (now.getTime() > l.expirationTime.getTime() || l.lockCount <= 0) { + l.lockCount = 0; + } + dbLockList.add(l); + } else { + insertLockNameList.add(name); + } + } + + // Update the lock info in DB + for (ResourceLock l : dbLockList) { + resourceLockDao.update(l.id, lockRequester, now, new Date(now.getTime() + lockTimeout * 1000), l.lockCount + 1); + } + + // Insert records for those that are not yet there + for (String lockName : insertLockNameList) { + ResourceLock l = new ResourceLock(); + l.resourceName = lockName; + l.lockHolder = lockRequester; + l.lockTime = now; + l.expirationTime = new Date(now.getTime() + lockTimeout * 1000); + l.lockCount = 1; + + try { + resourceLockDao.add(l); + } catch (Exception e) { + throw new ResourceLockedException(l.resourceName, "unknown", lockRequester); + } + } + + resourceLockDao.commit(); + + } catch (Exception e) { + resourceLockDao.rollback(); + throw e; + } + } + } + + private static String generateLockRequester(String name, int maxLength) { + if (name == null) { + name = ""; + } + int l1 = name.length(); + String tname = Thread.currentThread().getName(); + int l2 = tname.length(); + if (l1 + l2 + 1 > maxLength) { + int maxl1 = maxLength / 2; + if (l1 > maxl1) { + name = name.substring(0, maxl1); + l1 = maxl1; + } + int maxl2 = maxLength - l1 - 1; + if (l2 > maxl2) { + tname = tname.substring(0, 6) + "..." + tname.substring(l2 - maxl2 + 9); + } + } + return tname + '-' + name; + } + + public void setRetryCount(int retryCount) { + this.retryCount = retryCount; + } + + public void setLockWait(int lockWait) { + this.lockWait = lockWait; + } + + public void setDataSource(DataSource dataSource) { + this.dataSource = dataSource; + } +} diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java new file mode 100644 index 000000000..a7e966855 --- /dev/null +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLock.java @@ -0,0 +1,13 @@ +package org.onap.ccsdk.features.lib.rlock; + +import java.util.Date; + +public class ResourceLock { + + public long id; + public String resourceName; + public String lockHolder; + public int lockCount; + public Date lockTime; + public Date expirationTime; +} diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java new file mode 100644 index 000000000..4833bb28e --- /dev/null +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockDao.java @@ -0,0 +1,122 @@ +package org.onap.ccsdk.features.lib.rlock; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.Date; + +import javax.sql.DataSource; + +public class ResourceLockDao implements AutoCloseable { + + private Connection con; + + public ResourceLockDao(DataSource dataSource) { + try { + con = dataSource.getConnection(); + con.setAutoCommit(false); + } catch (SQLException e) { + throw new RuntimeException("Error getting DB connection: " + e.getMessage(), e); + } + } + + public void add(ResourceLock l) { + String sql = "INSERT INTO RESOURCE_LOCK (resource_name, lock_holder, lock_count, lock_time, expiration_time)\n" + "VALUES (?, ?, ?, ?, ?)"; + + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setString(1, l.resourceName); + ps.setString(2, l.lockHolder); + ps.setInt(3, l.lockCount); + ps.setTimestamp(4, new Timestamp(l.lockTime.getTime())); + ps.setTimestamp(5, new Timestamp(l.expirationTime.getTime())); + ps.execute(); + } catch (SQLException e) { + throw new RuntimeException("Error adding lock to DB: " + e.getMessage(), e); + } + } + + public void update(long id, String lockHolder, Date lockTime, Date expirationTime, int lockCount) { + String sql = "UPDATE RESOURCE_LOCK SET lock_holder = ?, lock_time = ?, expiration_time = ?, lock_count = ? WHERE resource_lock_id = ?"; + + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setString(1, lockHolder); + ps.setTimestamp(2, new Timestamp(lockTime.getTime())); + ps.setTimestamp(3, new Timestamp(expirationTime.getTime())); + ps.setInt(4, lockCount); + ps.setLong(5, id); + ps.execute(); + } catch (SQLException e) { + throw new RuntimeException("Error updating lock in DB: " + e.getMessage(), e); + } + } + + public ResourceLock getByResourceName(String resourceName) { + String sql = "SELECT * FROM RESOURCE_LOCK WHERE resource_name = ?"; + + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setString(1, resourceName); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + ResourceLock rl = new ResourceLock(); + rl.id = rs.getLong("resource_lock_id"); + rl.resourceName = rs.getString("resource_name"); + rl.lockHolder = rs.getString("lock_holder"); + rl.lockCount = rs.getInt("lock_count"); + rl.lockTime = rs.getTimestamp("lock_time"); + rl.expirationTime = rs.getTimestamp("expiration_time"); + return rl; + } + return null; + } + } catch (SQLException e) { + throw new RuntimeException("Error reading lock from DB: " + e.getMessage(), e); + } + } + + public void delete(long id) { + String sql = "DELETE FROM RESOURCE_LOCK WHERE resource_lock_id = ?"; + + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setLong(1, id); + ps.execute(); + } catch (SQLException e) { + throw new RuntimeException("Error deleting lock from DB: " + e.getMessage(), e); + } + } + + public void decrementLockCount(long id) { + String sql = "UPDATE RESOURCE_LOCK SET lock_count = lock_count - 1 WHERE resource_lock_id = ?"; + + try (PreparedStatement ps = con.prepareStatement(sql)) { + ps.setLong(1, id); + ps.execute(); + } catch (SQLException e) { + throw new RuntimeException("Error updating lock count in DB: " + e.getMessage(), e); + } + } + + public void commit() { + try { + con.commit(); + } catch (SQLException e) { + throw new RuntimeException("Error committing DB connection: " + e.getMessage(), e); + } + } + + public void rollback() { + try { + con.rollback(); + } catch (SQLException e) { + } + } + + @Override + public void close() { + try { + con.close(); + } catch (SQLException e) { + } + } +} diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java new file mode 100644 index 000000000..7c8cfa122 --- /dev/null +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/ResourceLockedException.java @@ -0,0 +1,20 @@ +package org.onap.ccsdk.features.lib.rlock; + +public class ResourceLockedException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + private String lockName, lockHolder, lockRequester; + + public ResourceLockedException(String lockName, String lockHolder, String lockRequester) { + this.lockName = lockName; + this.lockHolder = lockHolder; + this.lockRequester = lockRequester; + } + + @Override + public String getMessage() { + return "Failed to lock [" + lockName + "] for [" + lockRequester + "]. Currently locked by [" + lockHolder + + "]."; + } +} diff --git a/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java new file mode 100644 index 000000000..ff25e16f0 --- /dev/null +++ b/lib/rlock/src/main/java/org/onap/ccsdk/features/lib/rlock/SynchronizedFunction.java @@ -0,0 +1,35 @@ +package org.onap.ccsdk.features.lib.rlock; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +public abstract class SynchronizedFunction { + + private Set<String> synchset; + private String lockRequester; + private int lockTimeout; // Seconds + private LockHelper lockHelper; + + protected SynchronizedFunction(LockHelper lockHelper, Collection<String> synchset, int lockTimeout) { + this.lockHelper = lockHelper; + this.synchset = new HashSet<String>(synchset); + this.lockRequester = generateLockRequester(); + this.lockTimeout = lockTimeout; + } + + protected abstract void _exec(); + + public void exec() { + lockHelper.lock(synchset, lockRequester, lockTimeout); + try { + _exec(); + } finally { + lockHelper.unlock(synchset, true); + } + } + + private static String generateLockRequester() { + return "SynchronizedFunction-" + (int) (Math.random() * 1000000); + } +} diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java new file mode 100644 index 000000000..9f37894c9 --- /dev/null +++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/TestLockHelper.java @@ -0,0 +1,51 @@ +package org.onap.ccsdk.features.lib.rlock; + +import org.junit.Test; +import org.onap.ccsdk.features.lib.rlock.testutils.DbUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestLockHelper { + + private static final Logger log = LoggerFactory.getLogger(TestLockHelper.class); + + @Test + public void test1() throws Exception { + LockThread t1 = new LockThread("req1"); + LockThread t2 = new LockThread("req2"); + LockThread t3 = new LockThread("req3"); + + t1.start(); + t2.start(); + t3.start(); + + t1.join(); + t2.join(); + t3.join(); + } + + private class LockThread extends Thread { + + private String requester; + + public LockThread(String requester) { + this.requester = requester; + } + + @Override + public void run() { + LockHelperImpl lockHelper = new LockHelperImpl(); + lockHelper.setDataSource(DbUtil.getDataSource()); + + lockHelper.lock("resource1", requester, 20); + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + log.warn("Thread interrupted: " + e.getMessage(), e); + } + + lockHelper.unlock("resource1", false); + } + } +} diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java new file mode 100644 index 000000000..38d4d62c1 --- /dev/null +++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/DbUtil.java @@ -0,0 +1,92 @@ +package org.onap.ccsdk.features.lib.rlock.testutils; + +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; + +import javax.sql.DataSource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DbUtil { + + private static final Logger log = LoggerFactory.getLogger(DbUtil.class); + + private static DataSource dataSource = null; + + public static synchronized DataSource getDataSource() { + if (dataSource == null) { + String url = "jdbc:h2:mem:app;DB_CLOSE_DELAY=-1"; + + dataSource = new DataSource() { + + @Override + public <T> T unwrap(Class<T> arg0) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class<?> arg0) throws SQLException { + return false; + } + + @Override + public void setLoginTimeout(int arg0) throws SQLException { + } + + @Override + public void setLogWriter(PrintWriter arg0) throws SQLException { + } + + @Override + public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { + return null; + } + + @Override + public int getLoginTimeout() throws SQLException { + return 0; + } + + @Override + public PrintWriter getLogWriter() throws SQLException { + return null; + } + + @Override + public Connection getConnection(String username, String password) throws SQLException { + return null; + } + + @Override + public Connection getConnection() throws SQLException { + return DriverManager.getConnection(url); + } + }; + + try { + String script = FileUtil.read("/schema.sql"); + + String[] sqlList = script.split(";"); + try (Connection con = dataSource.getConnection()) { + for (String sql : sqlList) { + if (!sql.trim().isEmpty()) { + sql = sql.trim(); + try (PreparedStatement ps = con.prepareStatement(sql)) { + log.info("Executing statement:\n" + sql); + ps.execute(); + } + } + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return dataSource; + } +} diff --git a/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java new file mode 100644 index 000000000..e51a3b082 --- /dev/null +++ b/lib/rlock/src/test/java/org/onap/ccsdk/features/lib/rlock/testutils/FileUtil.java @@ -0,0 +1,24 @@ +package org.onap.ccsdk.features.lib.rlock.testutils; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; + +public class FileUtil { + + public static String read(String fileName) throws Exception { + String ss = ""; + try (InputStream is = DbUtil.class.getResourceAsStream(fileName)) { + try (InputStreamReader isr = new InputStreamReader(is)) { + try (BufferedReader in = new BufferedReader(isr)) { + String s = in.readLine(); + while (s != null) { + ss += s + '\n'; + s = in.readLine(); + } + } + } + } + return ss; + } +} diff --git a/lib/rlock/src/test/resources/schema.sql b/lib/rlock/src/test/resources/schema.sql new file mode 100644 index 000000000..26f38f684 --- /dev/null +++ b/lib/rlock/src/test/resources/schema.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS `resource_lock` ( + `resource_lock_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `resource_name` varchar(256), + `lock_holder` varchar(100) NOT NULL, + `lock_count` smallint(6) NOT NULL, + `lock_time` datetime NOT NULL, + `expiration_time` datetime NOT NULL, + PRIMARY KEY (`resource_lock_id`), + UNIQUE KEY `IX1_RESOURCE_LOCK` (`resource_name`) +); @@ -47,6 +47,8 @@ <module>sdnr/wt</module> <module>sdnr/northbound</module> <module>aafshiro</module> + <module>lib/rlock</module> + <module>lib/doorman</module> </modules> <scm> |