aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcaegen2/services/sonhms/MainThread.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcaegen2/services/sonhms/MainThread.java')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/sonhms/MainThread.java123
1 files changed, 108 insertions, 15 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/MainThread.java b/src/main/java/org/onap/dcaegen2/services/sonhms/MainThread.java
index 84775ad..6cbb711 100644
--- a/src/main/java/org/onap/dcaegen2/services/sonhms/MainThread.java
+++ b/src/main/java/org/onap/dcaegen2/services/sonhms/MainThread.java
@@ -23,12 +23,16 @@ package org.onap.dcaegen2.services.sonhms;
import fj.data.Either;
+import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import org.onap.dcaegen2.services.sonhms.model.FapServiceList;
import org.onap.dcaegen2.services.sonhms.model.Notification;
import org.onap.dcaegen2.services.sonhms.utils.ClusterUtils;
import org.onap.dcaegen2.services.sonhms.utils.ThreadUtils;
@@ -36,37 +40,81 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainThread implements Runnable {
+
private static Logger log = LoggerFactory.getLogger(MainThread.class);
- private NewNotification newNotification;
+ private NewSdnrNotification newNotification;
+
+ private NewFmNotification newFmNotification;
private BlockingQueue<List<String>> childStatusQueue;
-
+
private DmaapNotificationsComponent dmaapNotificationsComponent;
-
+
+ private FaultNotificationComponent faultNotificationComponent;
+
private EventHandler eventHandler;
-
+
+ private Map<String, FaultEvent> bufferedFMNotificationCells;
+
+ private List<String> sdnrNotificationCells;
+
+ private Boolean isTimer;
+
+ private Timestamp startTimer;
+
+ List<FaultEvent> fmNotificationToBuffer;
+
/**
* parameterized constructor.
*/
- public MainThread(NewNotification newNotification) {
- super();
+ public MainThread(NewSdnrNotification newNotification, NewFmNotification newFmNotification) {
+ super();
+ this.newFmNotification = newFmNotification;
this.newNotification = newNotification;
childStatusQueue = new LinkedBlockingQueue<>();
dmaapNotificationsComponent = new DmaapNotificationsComponent();
- eventHandler = new EventHandler(childStatusQueue,
- Executors.newFixedThreadPool(Configuration.getInstance().getMaximumClusters()),
- new HashMap<>(), new ClusterUtils(), new ThreadUtils());
- }
-
+ faultNotificationComponent = new FaultNotificationComponent();
+ sdnrNotificationCells = new ArrayList<>();
+ fmNotificationToBuffer = new ArrayList<>();
+ bufferedFMNotificationCells = new HashMap<>();
+ eventHandler = new EventHandler(childStatusQueue,
+ Executors.newFixedThreadPool(Configuration.getInstance().getMaximumClusters()), new HashMap<>(),
+ new ClusterUtils(), new ThreadUtils());
+ isTimer = false;
+ startTimer = new Timestamp(System.currentTimeMillis());
+
+ }
+
@Override
public void run() {
log.info("Starting Main Thread");
// Check for Notifications from Dmaap and Child thread
- Boolean done = false;
-
+ Boolean done = false;
+
while (!done) {
+
+ Timestamp currentTime = new Timestamp(System.currentTimeMillis());
+ if (isTimer) {
+ Long difference = currentTime.getTime() - startTimer.getTime();
+ if (difference > 5000) {
+ log.info("FM handling difference > 5000");
+
+ for (String sdnrCell: sdnrNotificationCells) {
+ bufferedFMNotificationCells.remove(sdnrCell);
+ }
+
+ log.info("FM bufferedFMNotificationCells {}", bufferedFMNotificationCells.values());
+ List<FaultEvent> fmNotificationsToHandle = new ArrayList<>(
+ bufferedFMNotificationCells.values());
+ Boolean result = eventHandler.handleFaultNotification(fmNotificationsToHandle);
+ bufferedFMNotificationCells = new HashMap<>();
+ isTimer = false;
+ log.info("FM notification handling {}", result);
+ }
+ }
+
try {
if (!childStatusQueue.isEmpty()) {
List<String> childState = childStatusQueue.poll();
@@ -76,12 +124,57 @@ public class MainThread implements Runnable {
}
if (newNotification.getNewNotif()) {
- Either<Notification, Integer> notification = dmaapNotificationsComponent.getDmaapNotifications();
+ Either<Notification, Integer> notification = dmaapNotificationsComponent.getSdnrNotifications();
if (notification.isRight()) {
- log.error("Error parsing the notification from SDNR");
+ if (notification.right().value() == 400) {
+ log.error("Error parsing the notification from SDNR");
+ } else if (notification.right().value() == 404) {
+ newNotification.setNewNotif(false);
+ }
} else if (notification.isLeft()) {
+ List<FapServiceList> fapServiceLists = (notification.left().value()).getPayload()
+ .getRadioAccess().getFapServiceList();
+ for (FapServiceList fapServiceList : fapServiceLists) {
+ sdnrNotificationCells.add(fapServiceList.getAlias());
+
+ }
+
Boolean result = eventHandler.handleSdnrNotification(notification.left().value());
log.debug("SDNR notification handling {}", result);
+
+ }
+
+ }
+ if (newFmNotification.getNewNotif()) {
+ log.info("newFmNotification has come");
+
+ String faultCellId = "";
+ Either<List<FaultEvent>, Integer> fmNotifications = faultNotificationComponent
+ .getFaultNotifications();
+ if (fmNotifications.isRight()) {
+ if (fmNotifications.right().value() == 400) {
+ log.info("Error parsing notifications");
+ } else if (fmNotifications.right().value() == 404) {
+ newFmNotification.setNewNotif(false);
+ }
+ } else {
+ for (FaultEvent fmNotification : fmNotifications.left().value()) {
+ faultCellId = fmNotification.getEvent().getCommonEventHeader().getSourceName();
+ bufferedFMNotificationCells.put(faultCellId, fmNotification);
+ log.info("Buffered FM cell {}", faultCellId);
+ log.info("fmNotification{}", fmNotification);
+
+ }
+ log.info("bufferedFMNotificationCells before staring timer {}",
+ bufferedFMNotificationCells.keySet());
+
+ for (String sdnrCell: sdnrNotificationCells) {
+ bufferedFMNotificationCells.remove(sdnrCell);
+ }
+
+ startTimer = new Timestamp(System.currentTimeMillis());
+ isTimer = true;
+ log.info("Buffered FM cell {}", bufferedFMNotificationCells.keySet());
}
}