aboutsummaryrefslogtreecommitdiffstats
path: root/PolicyEngineUtils/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'PolicyEngineUtils/src/main/java')
-rw-r--r--PolicyEngineUtils/src/main/java/org/onap/policy/utils/BackUpMonitor.java733
-rw-r--r--PolicyEngineUtils/src/main/java/org/onap/policy/utils/BackUpMonitorException.java44
-rw-r--r--PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusConsumer.java37
-rw-r--r--PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusPublisher.java31
-rw-r--r--PolicyEngineUtils/src/main/java/org/onap/policy/utils/PolicyUtils.java2
5 files changed, 484 insertions, 363 deletions
diff --git a/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BackUpMonitor.java b/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BackUpMonitor.java
index 6fff797a6..d4f111637 100644
--- a/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BackUpMonitor.java
+++ b/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BackUpMonitor.java
@@ -40,6 +40,7 @@ import org.onap.policy.jpa.BackUpMonitorEntity;
import org.onap.policy.std.NotificationStore;
import org.onap.policy.std.StdPDPNotification;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.github.fge.jackson.JsonLoader;
import com.github.fge.jsonpatch.JsonPatch;
@@ -47,376 +48,404 @@ import com.github.fge.jsonpatch.JsonPatchException;
import com.github.fge.jsonpatch.diff.JsonDiff;
/**
- * BackUp Monitor checks Backup Status with the Database and maintains Redundancy for Gateway Applications.
+ * BackUp Monitor checks Backup Status with the Database and maintains Redundancy for Gateway Applications.
*
*/
public class BackUpMonitor {
- private static final Logger LOGGER = Logger.getLogger(BackUpMonitor.class.getName());
- private static final int DEFAULT_PING = 15000; // Value is in milliseconds.
+ private static final Logger LOGGER = Logger.getLogger(BackUpMonitor.class.getName());
+ private static final int DEFAULT_PING = 15000; // Value is in milliseconds.
- private static BackUpMonitor instance = null;
- private static String resourceName = null;
- private static String resourceNodeName = null;
- private static String notificationRecord = null;
- private static String lastMasterNotification= null;
- private static int pingInterval = DEFAULT_PING;
- private static Boolean masterFlag = false;
- private static Object lock = new Object();
- private static Object notificationLock = new Object();
- private static BackUpHandler handler= null;
- private EntityManager em;
- private EntityManagerFactory emf;
+ private static BackUpMonitor instance = null;
+ private static String resourceName = null;
+ private static String resourceNodeName = null;
+ private static String notificationRecord = null;
+ private static String lastMasterNotification = null;
+ private static int pingInterval = DEFAULT_PING;
+ private static Boolean masterFlag = false;
+ private static Object lock = new Object();
+ private static Object notificationLock = new Object();
+ private static BackUpHandler handler = null;
+ private EntityManager em;
+ private EntityManagerFactory emf;
- /*
- * Enumeration for the Resource Node Naming. Add here if required.
- */
- public enum ResourceNode{
- BRMS,
- ASTRA
- }
+ /*
+ * Enumeration for the Resource Node Naming. Add here if required.
+ */
+ public enum ResourceNode {
+ BRMS, ASTRA
+ }
- private BackUpMonitor(String resourceNodeName, String resourceName, Properties properties, BackUpHandler handler) throws Exception{
- if(instance == null){
- instance = this;
- }
- BackUpMonitor.resourceNodeName = resourceNodeName;
- BackUpMonitor.resourceName = resourceName;
- BackUpMonitor.handler = handler;
- // Create Persistence Entity
- properties.setProperty(PersistenceUnitProperties.ECLIPSELINK_PERSISTENCE_XML, "META-INF/persistencePU.xml");
- emf = Persistence.createEntityManagerFactory("PolicyEngineUtils", properties);
- if(emf==null){
- LOGGER.error("Unable to create Entity Manger Factory ");
- throw new Exception("Unable to create Entity Manger Factory");
- }
- em = emf.createEntityManager();
+ private BackUpMonitor(String resourceNodeName, String resourceName, Properties properties, BackUpHandler handler)
+ throws BackUpMonitorException {
+ if (instance == null) {
+ instance = this;
+ }
+ BackUpMonitor.resourceNodeName = resourceNodeName;
+ BackUpMonitor.resourceName = resourceName;
+ BackUpMonitor.handler = handler;
+ // Create Persistence Entity
+ properties.setProperty(PersistenceUnitProperties.ECLIPSELINK_PERSISTENCE_XML, "META-INF/persistencePU.xml");
+ emf = Persistence.createEntityManagerFactory("PolicyEngineUtils", properties);
+ if (emf == null) {
+ LOGGER.error("Unable to create Entity Manger Factory ");
+ throw new BackUpMonitorException("Unable to create Entity Manger Factory");
+ }
+ em = emf.createEntityManager();
- // Check Database if this is Master or Slave.
- checkDataBase();
+ // Check Database if this is Master or Slave.
+ checkDataBase();
- // Start thread.
- Thread t = new Thread(new BMonitor());
- t.start();
- }
+ // Start thread.
+ Thread t = new Thread(new BMonitor());
+ t.start();
+ }
- /**
- * Gets the BackUpMonitor Instance if given proper resourceName and properties. Else returns null.
- *
- * @param resourceNodeName String format of the Resource Node to which the resource Belongs to.
- * @param resourceName String format of the ResourceName. This needs to be Unique.
- * @param properties Properties format of the properties file.
- * @return BackUpMonitor instance.
- */
- public static synchronized BackUpMonitor getInstance(String resourceNodeName, String resourceName, Properties properties, BackUpHandler handler) throws Exception {
- if(resourceNodeName==null || resourceNodeName.trim().equals("") ||resourceName==null|| resourceName.trim().equals("") || properties == null || handler==null){
- LOGGER.error("Error while getting Instance. Please check resourceName and/or properties file");
- return null;
- }else if((resourceNodeName.equals(ResourceNode.ASTRA.toString()) || resourceNodeName.equals(ResourceNode.BRMS.toString())) && validate(properties) && instance==null){
- LOGGER.info("Creating Instance of BackUpMonitor");
- instance = new BackUpMonitor(resourceNodeName, resourceName, properties, handler);
- }
- return instance;
- }
+ /**
+ * Gets the BackUpMonitor Instance if given proper resourceName and properties. Else returns null.
+ *
+ * @param resourceNodeName
+ * String format of the Resource Node to which the resource Belongs to.
+ * @param resourceName
+ * String format of the ResourceName. This needs to be Unique.
+ * @param properties
+ * Properties format of the properties file.
+ * @return BackUpMonitor instance.
+ */
+ public static synchronized BackUpMonitor getInstance(String resourceNodeName, String resourceName,
+ Properties properties, BackUpHandler handler) throws BackUpMonitorException {
+ if (resourceNodeName == null || resourceNodeName.trim().equals("") || resourceName == null
+ || resourceName.trim().equals("") || properties == null || handler == null) {
+ LOGGER.error("Error while getting Instance. Please check resourceName and/or properties file");
+ return null;
+ } else if ((resourceNodeName.equals(ResourceNode.ASTRA.toString())
+ || resourceNodeName.equals(ResourceNode.BRMS.toString())) && validate(properties) && instance == null) {
+ LOGGER.info("Creating Instance of BackUpMonitor");
+ instance = new BackUpMonitor(resourceNodeName, resourceName, properties, handler);
+ }
+ return instance;
+ }
- // This is to validate given Properties with required values.
- private static Boolean validate(Properties properties){
- if(properties.getProperty("javax.persistence.jdbc.driver")==null ||properties.getProperty("javax.persistence.jdbc.driver").trim().equals("")){
- LOGGER.error("javax.persistence.jdbc.driver property is empty");
- return false;
- }
- if(properties.getProperty("javax.persistence.jdbc.url")==null || properties.getProperty("javax.persistence.jdbc.url").trim().equals("")){
- LOGGER.error("javax.persistence.jdbc.url property is empty");
- return false;
- }
- if(properties.getProperty("javax.persistence.jdbc.user")==null || properties.getProperty("javax.persistence.jdbc.user").trim().equals("")){
- LOGGER.error("javax.persistence.jdbc.user property is empty");
- return false;
- }
- if(properties.getProperty("javax.persistence.jdbc.password")==null || properties.getProperty("javax.persistence.jdbc.password").trim().equals("")){
- LOGGER.error("javax.persistence.jdbc.password property is empty");
- return false;
- }
- if(properties.getProperty("ping_interval")==null || properties.getProperty("ping_interval").trim().equals("")){
- LOGGER.info("ping_interval property not specified. Taking default value");
- }else{
- try{
- pingInterval = Integer.parseInt(properties.getProperty("ping_interval").trim());
- }catch(NumberFormatException e){
- LOGGER.warn("Ignored invalid proeprty ping_interval. Taking default value: " + pingInterval);
- pingInterval = DEFAULT_PING;
- }
- }
- return true;
- }
+ // This is to validate given Properties with required values.
+ private static Boolean validate(Properties properties) {
+ if (properties.getProperty("javax.persistence.jdbc.driver") == null
+ || properties.getProperty("javax.persistence.jdbc.driver").trim().equals("")) {
+ LOGGER.error("javax.persistence.jdbc.driver property is empty");
+ return false;
+ }
+ if (properties.getProperty("javax.persistence.jdbc.url") == null
+ || properties.getProperty("javax.persistence.jdbc.url").trim().equals("")) {
+ LOGGER.error("javax.persistence.jdbc.url property is empty");
+ return false;
+ }
+ if (properties.getProperty("javax.persistence.jdbc.user") == null
+ || properties.getProperty("javax.persistence.jdbc.user").trim().equals("")) {
+ LOGGER.error("javax.persistence.jdbc.user property is empty");
+ return false;
+ }
+ if (properties.getProperty("javax.persistence.jdbc.password") == null
+ || properties.getProperty("javax.persistence.jdbc.password").trim().equals("")) {
+ LOGGER.error("javax.persistence.jdbc.password property is empty");
+ return false;
+ }
+ if (properties.getProperty("ping_interval") == null
+ || properties.getProperty("ping_interval").trim().equals("")) {
+ LOGGER.info("ping_interval property not specified. Taking default value");
+ } else {
+ try {
+ pingInterval = Integer.parseInt(properties.getProperty("ping_interval").trim());
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Ignored invalid proeprty ping_interval. Taking default value: " + pingInterval);
+ pingInterval = DEFAULT_PING;
+ }
+ }
+ return true;
+ }
- // Sets the Flag for masterFlag to either True or False.
- private static void setFlag(Boolean flag){
- synchronized (lock) {
- masterFlag = flag;
- }
- }
+ // Sets the Flag for masterFlag to either True or False.
+ private static void setFlag(Boolean flag) {
+ synchronized (lock) {
+ masterFlag = flag;
+ }
+ }
- /**
- * Gets the Boolean value of Master(True) or Slave mode (False)
- *
- * @return Boolean flag which if True means that the operation needs to be performed(Master mode) or if false the operation is in slave mode.
- */
- public Boolean getFlag(){
- synchronized (lock) {
- return masterFlag;
- }
- }
+ /**
+ * Gets the Boolean value of Master(True) or Slave mode (False)
+ *
+ * @return Boolean flag which if True means that the operation needs to be performed(Master mode) or if false the
+ * operation is in slave mode.
+ */
+ public Boolean getFlag() {
+ synchronized (lock) {
+ return masterFlag;
+ }
+ }
- // BackUpMonitor Thread
- private class BMonitor implements Runnable{
- @Override
- public void run() {
- LOGGER.info("Starting BackUpMonitor Thread.. ");
- while(true){
- try {
- TimeUnit.MILLISECONDS.sleep(pingInterval);
- checkDataBase();
- } catch (Exception e) {
- LOGGER.error("Error during Thread execution " + e.getMessage());
- }
- }
- }
- }
+ // BackUpMonitor Thread
+ private class BMonitor implements Runnable {
+ @Override
+ public void run() {
+ LOGGER.info("Starting BackUpMonitor Thread.. ");
+ while (true) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(pingInterval);
+ checkDataBase();
+ } catch (Exception e) {
+ LOGGER.error("Error during Thread execution " + e.getMessage(), e);
+ }
+ }
+ }
+ }
- // Set Master
- private static BackUpMonitorEntity setMaster(BackUpMonitorEntity bMEntity){
- bMEntity.setFlag("MASTER");
- setFlag(true);
- return bMEntity;
- }
+ // Set Master
+ private static BackUpMonitorEntity setMaster(BackUpMonitorEntity bMEntity) {
+ bMEntity.setFlag("MASTER");
+ setFlag(true);
+ return bMEntity;
+ }
- // Set Slave
- private static BackUpMonitorEntity setSlave(BackUpMonitorEntity bMEntity){
- bMEntity.setFlag("SLAVE");
- setFlag(false);
- return bMEntity;
- }
+ // Set Slave
+ private static BackUpMonitorEntity setSlave(BackUpMonitorEntity bMEntity) {
+ bMEntity.setFlag("SLAVE");
+ setFlag(false);
+ return bMEntity;
+ }
- // Check Database and set the Flag.
- private void checkDataBase() throws Exception {
- EntityTransaction et = em.getTransaction();
- notificationRecord = PolicyUtils.objectToJsonString(NotificationStore.getNotificationRecord());
- // Clear Cache.
- LOGGER.info("Clearing Cache");
- em.getEntityManagerFactory().getCache().evictAll();
- try{
- LOGGER.info("Checking Datatbase for BackUpMonitor.. ");
- et.begin();
- Query query = em.createQuery("select b from BackUpMonitorEntity b where b.resourceNodeName = :nn");
- if(resourceNodeName.equals(ResourceNode.ASTRA.toString())){
- query.setParameter("nn", ResourceNode.ASTRA.toString());
- }else if(resourceNodeName.equals(ResourceNode.BRMS.toString())){
- query.setParameter("nn", ResourceNode.BRMS.toString());
- }
- List<?> bMList = query.getResultList();
- if(bMList.isEmpty()){
- // This is New. create an entry as Master.
- LOGGER.info("Adding resource " + resourceName + " to Database");
- BackUpMonitorEntity bMEntity = new BackUpMonitorEntity();
- bMEntity.setResoruceNodeName(resourceNodeName);
- bMEntity.setResourceName(resourceName);
- bMEntity = setMaster(bMEntity);
- bMEntity.setTimeStamp(new Date());
- em.persist(bMEntity);
- em.flush();
- }else{
- // Check for other Master(s)
- ArrayList<BackUpMonitorEntity> masterEntities = new ArrayList<>();
- // Check for self.
- BackUpMonitorEntity selfEntity = null;
- // Check backup monitor entities.
- for(int i=0; i< bMList.size(); i++){
- BackUpMonitorEntity bMEntity = (BackUpMonitorEntity) bMList.get(i);
- LOGGER.info("Refreshing Entity. ");
- em.refresh(bMEntity);
- if(bMEntity.getFlag().equalsIgnoreCase("MASTER")){
- masterEntities.add(bMEntity);
- }
- if(bMEntity.getResourceName().equals(resourceName)){
- selfEntity = bMEntity;
- }
- }
- if(selfEntity!=null){
- LOGGER.info("Resource Name already Exists: " + resourceName);
- if(selfEntity.getFlag().equalsIgnoreCase("MASTER")){
- // Already Master Mode.
- setFlag(true);
- LOGGER.info(resourceName + " is on Master Mode");
- selfEntity.setTimeStamp(new Date());
- selfEntity.setNotificationRecord(notificationRecord);
- em.persist(selfEntity);
- em.flush();
- setLastNotification(null);
- if(!masterEntities.contains(selfEntity)){
- masterEntities.add(selfEntity);
- }
- }else{
- // Already Slave Mode.
- setFlag(false);
- selfEntity.setTimeStamp(new Date());
- selfEntity.setNotificationRecord(notificationRecord);
- em.persist(selfEntity);
- em.flush();
- LOGGER.info(resourceName + " is on Slave Mode");
- }
- }else{
- // Resource name is null -> No resource with same name.
- selfEntity = new BackUpMonitorEntity();
- selfEntity.setResoruceNodeName(resourceNodeName);
- selfEntity.setResourceName(resourceName);
- selfEntity.setTimeStamp(new Date());
- selfEntity = setSlave(selfEntity);
- setLastNotification(null);
- LOGGER.info("Creating: " + resourceName + " on Slave Mode");
- em.persist(selfEntity);
- em.flush();
- }
- // Correct the database if any errors and perform monitor checks.
- if(masterEntities.size()!=1 || !getFlag()){
- // We are either not master or there are more masters or no masters.
- if(masterEntities.size()==0){
- // No Masters is a problem Convert ourselves to Master.
- selfEntity = setMaster(selfEntity);
- selfEntity.setTimeStamp(new Date());
- selfEntity.setNotificationRecord(notificationRecord);
- LOGGER.info(resourceName + " changed to Master Mode - No Masters available.");
- em.persist(selfEntity);
- em.flush();
- }else {
- if(masterEntities.size()>1){
- // More Masters is a problem, Fix the issue by looking for the latest one and make others Slave.
- BackUpMonitorEntity masterEntity = null;
- for(BackUpMonitorEntity currentEntity: masterEntities){
- if(currentEntity.getFlag().equalsIgnoreCase("MASTER")){
- if(masterEntity==null){
- masterEntity = currentEntity;
- }else if(currentEntity.getTimeStamp().getTime() > masterEntity.getTimeStamp().getTime()){
- // False Master, Update master to slave and take currentMaster as Master.
- masterEntity.setFlag("SLAVE");
- masterEntity.setTimeStamp(new Date());
- em.persist(masterEntity);
- em.flush();
- masterEntity = currentEntity;
- }else{
- currentEntity.setFlag("SLAVE");
- currentEntity.setTimeStamp(new Date());
- em.persist(currentEntity);
- em.flush();
- }
- }
- }
- masterEntities = new ArrayList<>();
- masterEntities.add(masterEntity);
- }
- if(masterEntities.size()==1){
- // Correct Size, Check if Master is Latest, if not Change Master to Slave and Slave to Master.
- BackUpMonitorEntity masterEntity = masterEntities.get(0);
- if(!masterEntity.getResourceName().equals(selfEntity.getResourceName())){
- Date currentTime = new Date();
- long timeDiff = 0;
- timeDiff = currentTime.getTime()-masterEntity.getTimeStamp().getTime();
- if(timeDiff > (pingInterval+1500)){
- // This is down or has an issue and we need to become Master while turning the Master to slave.
- masterEntity.setFlag("SLAVE");
- String lastNotification = null;
- if(masterEntity.getNotificationRecord()!=null){
- lastNotification = calculatePatch(masterEntity.getNotificationRecord());
- }
- setLastNotification(lastNotification);
- em.persist(masterEntity);
- em.flush();
- // Lets Become Master.
- selfEntity = setMaster(selfEntity);
- LOGGER.info("Changing "+ resourceName + " from slave to Master Mode");
- selfEntity.setTimeStamp(new Date());
- selfEntity.setNotificationRecord(notificationRecord);
- em.persist(selfEntity);
- em.flush();
- }
- }
- }else{
- LOGGER.error("Backup Monitor Issue, Masters out of sync, This will be fixed in next interval.");
- }
- }
- }
- }
- et.commit();
- }catch(Exception e){
- LOGGER.error("failed Database Operation " + e.getMessage());
- if(et.isActive()){
- et.rollback();
- }
- throw new Exception(e);
- }
- }
+ // Check Database and set the Flag.
+ private void checkDataBase() throws BackUpMonitorException {
+ EntityTransaction et = em.getTransaction();
+ setNotificationRecord();
+ // Clear Cache.
+ LOGGER.info("Clearing Cache");
+ em.getEntityManagerFactory().getCache().evictAll();
+ try {
+ LOGGER.info("Checking Datatbase for BackUpMonitor.. ");
+ et.begin();
+ Query query = em.createQuery("select b from BackUpMonitorEntity b where b.resourceNodeName = :nn");
+ if (resourceNodeName.equals(ResourceNode.ASTRA.toString())) {
+ query.setParameter("nn", ResourceNode.ASTRA.toString());
+ } else if (resourceNodeName.equals(ResourceNode.BRMS.toString())) {
+ query.setParameter("nn", ResourceNode.BRMS.toString());
+ }
+ List<?> bMList = query.getResultList();
+ if (bMList.isEmpty()) {
+ // This is New. create an entry as Master.
+ LOGGER.info("Adding resource " + resourceName + " to Database");
+ BackUpMonitorEntity bMEntity = new BackUpMonitorEntity();
+ bMEntity.setResoruceNodeName(resourceNodeName);
+ bMEntity.setResourceName(resourceName);
+ bMEntity = setMaster(bMEntity);
+ bMEntity.setTimeStamp(new Date());
+ em.persist(bMEntity);
+ em.flush();
+ } else {
+ // Check for other Master(s)
+ ArrayList<BackUpMonitorEntity> masterEntities = new ArrayList<>();
+ // Check for self.
+ BackUpMonitorEntity selfEntity = null;
+ // Check backup monitor entities.
+ for (int i = 0; i < bMList.size(); i++) {
+ BackUpMonitorEntity bMEntity = (BackUpMonitorEntity) bMList.get(i);
+ LOGGER.info("Refreshing Entity. ");
+ em.refresh(bMEntity);
+ if (bMEntity.getFlag().equalsIgnoreCase("MASTER")) {
+ masterEntities.add(bMEntity);
+ }
+ if (bMEntity.getResourceName().equals(resourceName)) {
+ selfEntity = bMEntity;
+ }
+ }
+ if (selfEntity != null) {
+ LOGGER.info("Resource Name already Exists: " + resourceName);
+ if (selfEntity.getFlag().equalsIgnoreCase("MASTER")) {
+ // Already Master Mode.
+ setFlag(true);
+ LOGGER.info(resourceName + " is on Master Mode");
+ selfEntity.setTimeStamp(new Date());
+ selfEntity.setNotificationRecord(notificationRecord);
+ em.persist(selfEntity);
+ em.flush();
+ setLastNotification(null);
+ if (!masterEntities.contains(selfEntity)) {
+ masterEntities.add(selfEntity);
+ }
+ } else {
+ // Already Slave Mode.
+ setFlag(false);
+ selfEntity.setTimeStamp(new Date());
+ selfEntity.setNotificationRecord(notificationRecord);
+ em.persist(selfEntity);
+ em.flush();
+ LOGGER.info(resourceName + " is on Slave Mode");
+ }
+ } else {
+ // Resource name is null -> No resource with same name.
+ selfEntity = new BackUpMonitorEntity();
+ selfEntity.setResoruceNodeName(resourceNodeName);
+ selfEntity.setResourceName(resourceName);
+ selfEntity.setTimeStamp(new Date());
+ selfEntity = setSlave(selfEntity);
+ setLastNotification(null);
+ LOGGER.info("Creating: " + resourceName + " on Slave Mode");
+ em.persist(selfEntity);
+ em.flush();
+ }
+ // Correct the database if any errors and perform monitor checks.
+ if (masterEntities.size() != 1 || !getFlag()) {
+ // We are either not master or there are more masters or no masters.
+ if (masterEntities.isEmpty()) {
+ // No Masters is a problem Convert ourselves to Master.
+ selfEntity = setMaster(selfEntity);
+ selfEntity.setTimeStamp(new Date());
+ selfEntity.setNotificationRecord(notificationRecord);
+ LOGGER.info(resourceName + " changed to Master Mode - No Masters available.");
+ em.persist(selfEntity);
+ em.flush();
+ } else {
+ if (masterEntities.size() > 1) {
+ // More Masters is a problem, Fix the issue by looking for the latest one and make others
+ // Slave.
+ BackUpMonitorEntity masterEntity = null;
+ for (BackUpMonitorEntity currentEntity : masterEntities) {
+ if (currentEntity.getFlag().equalsIgnoreCase("MASTER")) {
+ if (masterEntity == null) {
+ masterEntity = currentEntity;
+ } else if (currentEntity.getTimeStamp().getTime() > masterEntity.getTimeStamp()
+ .getTime()) {
+ // False Master, Update master to slave and take currentMaster as Master.
+ masterEntity.setFlag("SLAVE");
+ masterEntity.setTimeStamp(new Date());
+ em.persist(masterEntity);
+ em.flush();
+ masterEntity = currentEntity;
+ } else {
+ currentEntity.setFlag("SLAVE");
+ currentEntity.setTimeStamp(new Date());
+ em.persist(currentEntity);
+ em.flush();
+ }
+ }
+ }
+ masterEntities = new ArrayList<>();
+ masterEntities.add(masterEntity);
+ }
+ if (masterEntities.size() == 1) {
+ // Correct Size, Check if Master is Latest, if not Change Master to Slave and Slave to
+ // Master.
+ BackUpMonitorEntity masterEntity = masterEntities.get(0);
+ if (!masterEntity.getResourceName().equals(selfEntity.getResourceName())) {
+ Date currentTime = new Date();
+ long timeDiff = 0;
+ timeDiff = currentTime.getTime() - masterEntity.getTimeStamp().getTime();
+ if (timeDiff > (pingInterval + 1500)) {
+ // This is down or has an issue and we need to become Master while turning the
+ // Master to slave.
+ masterEntity.setFlag("SLAVE");
+ String lastNotification = null;
+ if (masterEntity.getNotificationRecord() != null) {
+ lastNotification = calculatePatch(masterEntity.getNotificationRecord());
+ }
+ setLastNotification(lastNotification);
+ em.persist(masterEntity);
+ em.flush();
+ // Lets Become Master.
+ selfEntity = setMaster(selfEntity);
+ LOGGER.info("Changing " + resourceName + " from slave to Master Mode");
+ selfEntity.setTimeStamp(new Date());
+ selfEntity.setNotificationRecord(notificationRecord);
+ em.persist(selfEntity);
+ em.flush();
+ }
+ }
+ } else {
+ LOGGER.error(
+ "Backup Monitor Issue, Masters out of sync, This will be fixed in next interval.");
+ }
+ }
+ }
+ }
+ et.commit();
+ } catch (Exception e) {
+ LOGGER.error("failed Database Operation " + e.getMessage(), e);
+ if (et.isActive()) {
+ et.rollback();
+ }
+ throw new BackUpMonitorException(e);
+ }
+ }
- // Calculate Patch and return String JsonPatch of the notification Delta.
- private synchronized String calculatePatch(String oldNotificationRecord) {
- try{
- JsonNode notification = JsonLoader.fromString(notificationRecord);
- JsonNode oldNotification = JsonLoader.fromString(oldNotificationRecord);
- JsonNode patchNode = JsonDiff.asJson(oldNotification, notification);
- LOGGER.info("Generated JSON Patch is " + patchNode.toString());
- JsonPatch patch = JsonPatch.fromJson(patchNode);
- try {
- JsonNode patched = patch.apply(oldNotification);
- LOGGER.info("Generated New Notification is : " + patched.toString());
- return patched.toString();
- } catch (JsonPatchException e) {
- LOGGER.error("Error generating Patched " +e.getMessage());
- return null;
- }
- }catch(IOException e){
- LOGGER.error("Error generating Patched " +e.getMessage());
- return null;
- }
- }
+ private static void setNotificationRecord() throws BackUpMonitorException {
+ try {
+ notificationRecord = PolicyUtils.objectToJsonString(NotificationStore.getNotificationRecord());
+ } catch (JsonProcessingException e1) {
+ LOGGER.error("Error retrieving notification record failed. ", e1);
+ throw new BackUpMonitorException(e1);
+ }
+ }
- /**
- * Updates Notification in the Database while Performing the health check.
- *
- * @param notification String format of notification record to store in the Database.
- * @throws Exception
- */
- public synchronized void updateNotification() throws Exception{
- checkDataBase();
- }
+ // Calculate Patch and return String JsonPatch of the notification Delta.
+ private synchronized String calculatePatch(String oldNotificationRecord) {
+ try {
+ JsonNode notification = JsonLoader.fromString(notificationRecord);
+ JsonNode oldNotification = JsonLoader.fromString(oldNotificationRecord);
+ JsonNode patchNode = JsonDiff.asJson(oldNotification, notification);
+ LOGGER.info("Generated JSON Patch is " + patchNode.toString());
+ JsonPatch patch = JsonPatch.fromJson(patchNode);
+ try {
+ JsonNode patched = patch.apply(oldNotification);
+ LOGGER.info("Generated New Notification is : " + patched.toString());
+ return patched.toString();
+ } catch (JsonPatchException e) {
+ LOGGER.error("Error generating Patched " + e.getMessage(), e);
+ return null;
+ }
+ } catch (IOException e) {
+ LOGGER.error("Error generating Patched " + e.getMessage(), e);
+ return null;
+ }
+ }
- // Take in string notification and send the record delta to Handler.
- private static void callHandler(String notification){
- if(handler!=null){
- try {
- PDPNotification notificationObject = PolicyUtils.jsonStringToObject(notification, StdPDPNotification.class);
- if(notificationObject.getNotificationType()!=null){
- LOGGER.info("Performing Patched notification ");
- try{
- handler.runOnNotification(notificationObject);
- notificationRecord = lastMasterNotification;
- }catch (Exception e){
- LOGGER.error("Error in Clients Handler Object : " + e.getMessage());
- }
- }
- } catch (IOException e) {
- LOGGER.info("Error while notification Conversion " + e.getMessage());
- }
- }
- }
+ /**
+ * Updates Notification in the Database while Performing the health check.
+ *
+ * @param notification
+ * String format of notification record to store in the Database.
+ * @throws Exception
+ */
+ public synchronized void updateNotification() throws BackUpMonitorException {
+ checkDataBase();
+ }
- // Used to set LastMasterNotification Record.
- private static void setLastNotification(String notification){
- synchronized(notificationLock){
- lastMasterNotification = notification;
- if(lastMasterNotification!=null && !lastMasterNotification.equals("\"notificationType\":null")){
- if(lastMasterNotification.equals(notificationRecord)){
- return;
- }
- callHandler(notification);
- }
- }
- }
+ // Take in string notification and send the record delta to Handler.
+ private static void callHandler(String notification) {
+ if (handler != null) {
+ try {
+ PDPNotification notificationObject = PolicyUtils.jsonStringToObject(notification,
+ StdPDPNotification.class);
+ if (notificationObject.getNotificationType() != null) {
+ LOGGER.info("Performing Patched notification ");
+ try {
+ handler.runOnNotification(notificationObject);
+ notificationRecord = lastMasterNotification;
+ } catch (Exception e) {
+ LOGGER.error("Error in Clients Handler Object : " + e.getMessage(), e);
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.info("Error while notification Conversion " + e.getMessage(), e);
+ }
+ }
+ }
+
+ // Used to set LastMasterNotification Record.
+ private static void setLastNotification(String notification) {
+ synchronized (notificationLock) {
+ lastMasterNotification = notification;
+ if (lastMasterNotification != null && !lastMasterNotification.equals("\"notificationType\":null")) {
+ if (lastMasterNotification.equals(notificationRecord)) {
+ return;
+ }
+ callHandler(notification);
+ }
+ }
+ }
}
diff --git a/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BackUpMonitorException.java b/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BackUpMonitorException.java
new file mode 100644
index 000000000..b12e780a0
--- /dev/null
+++ b/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BackUpMonitorException.java
@@ -0,0 +1,44 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * PolicyEngineUtils
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.utils;
+
+public class BackUpMonitorException extends Exception{
+ private static final long serialVersionUID = 6778134503685443473L;
+
+ public BackUpMonitorException() {
+ }
+
+ public BackUpMonitorException(String message) {
+ super(message);
+ }
+
+ public BackUpMonitorException(Throwable cause){
+ super(cause);
+ }
+
+ public BackUpMonitorException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public BackUpMonitorException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusConsumer.java b/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusConsumer.java
index bf92835c6..1b31394d0 100644
--- a/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusConsumer.java
+++ b/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusConsumer.java
@@ -1,8 +1,30 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * PolicyEngineUtils
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
package org.onap.policy.utils;
+import java.net.MalformedURLException;
import java.util.List;
import java.util.Properties;
+import com.att.nsa.mr.client.MRClient.MRApiException;
import com.att.nsa.mr.client.impl.MRConsumerImpl;
import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
@@ -12,9 +34,9 @@ public interface BusConsumer {
* fetch messages
*
* @return list of messages
- * @throws Exception when error encountered by underlying libraries
+ * @throws MRApiException when error encountered by underlying libraries
*/
- public Iterable<String> fetch() throws Exception;
+ public Iterable<String> fetch() throws MRApiException;
/**
* close underlying library consumer
@@ -48,8 +70,7 @@ public interface BusConsumer {
public DmaapConsumerWrapper(List<String> servers, String topic,
String aafLogin, String aafPassword,
String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit)
- throws Exception {
+ int fetchTimeout, int fetchLimit) throws MalformedURLException{
this.consumer = new MRConsumerImpl(servers, topic,
consumerGroup, consumerInstance,
@@ -70,8 +91,12 @@ public interface BusConsumer {
/**
* {@inheritDoc}
*/
- public Iterable<String> fetch() throws Exception {
- return this.consumer.fetch();
+ public Iterable<String> fetch() throws MRApiException {
+ try {
+ return this.consumer.fetch();
+ } catch (Exception e) {
+ throw new MRApiException("Error during MR consumer Fetch ",e);
+ }
}
/**
diff --git a/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusPublisher.java b/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusPublisher.java
index 792c389ab..3190aa035 100644
--- a/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusPublisher.java
+++ b/PolicyEngineUtils/src/main/java/org/onap/policy/utils/BusPublisher.java
@@ -1,3 +1,23 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * PolicyEngineUtils
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
package org.onap.policy.utils;
import java.util.ArrayList;
@@ -6,6 +26,8 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.onap.policy.common.logging.eelf.PolicyLogger;
+import org.onap.policy.common.logging.flexlogger.FlexLogger;
+import org.onap.policy.common.logging.flexlogger.Logger;
import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
@@ -31,6 +53,7 @@ public interface BusPublisher {
* DmaapClient library wrapper
*/
public static class DmaapPublisherWrapper implements BusPublisher {
+ private static Logger logger = FlexLogger.getLogger(DmaapPublisherWrapper.class);
/**
* MR based Publisher
*/
@@ -74,16 +97,16 @@ public interface BusPublisher {
*/
@Override
public void close() {
- if (PolicyLogger.isInfoEnabled())
- PolicyLogger.info(DmaapPublisherWrapper.class.getName(),
+ if (logger.isInfoEnabled())
+ logger.info(DmaapPublisherWrapper.class.getName()+
"CREATION: " + this);
try {
this.publisher.close(1, TimeUnit.SECONDS);
} catch (Exception e) {
- PolicyLogger.warn(DmaapPublisherWrapper.class.getName(),
+ logger.warn(DmaapPublisherWrapper.class.getName()+
"CLOSE: " + this + " because of " +
- e.getMessage());
+ e.getMessage(), e);
}
}
diff --git a/PolicyEngineUtils/src/main/java/org/onap/policy/utils/PolicyUtils.java b/PolicyEngineUtils/src/main/java/org/onap/policy/utils/PolicyUtils.java
index 82340c13c..8bc83dcb8 100644
--- a/PolicyEngineUtils/src/main/java/org/onap/policy/utils/PolicyUtils.java
+++ b/PolicyEngineUtils/src/main/java/org/onap/policy/utils/PolicyUtils.java
@@ -70,7 +70,7 @@ public class PolicyUtils {
}
}
- public static String[] decodeBasicEncoding(String encodedValue) throws Exception{
+ public static String[] decodeBasicEncoding(String encodedValue) throws UnsupportedEncodingException {
if(encodedValue!=null && encodedValue.contains("Basic ")){
String encodedUserPassword = encodedValue.replaceFirst("Basic" + " ", "");
String usernameAndPassword = null;