summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/org/onap/dcae/controller/PersistentEventConnection.java60
1 files changed, 44 insertions, 16 deletions
diff --git a/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java b/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java
index 48d5448..14eb2e3 100644
--- a/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java
+++ b/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java
@@ -59,6 +59,7 @@ public class PersistentEventConnection implements Runnable {
private String event_ruleId;
private EventConnectionState state;
private volatile boolean running = true;
+ private static boolean subscribe = true;
private static final Logger log = LoggerFactory.getLogger(PersistentEventConnection.class);
private boolean modifyEvent;
private String modifyMethod;
@@ -173,13 +174,49 @@ public class PersistentEventConnection implements Runnable {
this.eventParaMap = new HashMap<>();
this.eventParaMap.putAll(builder.parentCtrllr.getParaMap());
printEventParamMap();
- log.info("New persistent connection created " + event_name + "modify event " + modifyEvent);
+ log.info("New persistent connection created " + event_name + " modify event " + modifyEvent);
}
@Override
public void run() {
- Parameters p = null;
+ int sleep_time = 5000;
+ boolean openState = false;
+ EventSource eventSrc = null;
+ while (running) {
+ try {
+ Thread.sleep(sleep_time);
+ if (subscribe) {
+ subscribe = false;
+ subscribe();
+ eventSrc = OpenSseConnection();
+ openState = eventSrc.isOpen();
+ log.info("SSE state " + eventSrc.isOpen());
+ }
+ log.info("SSE state " + openState);
+ if (eventSrc != null && !openState)
+ {
+ log.info("SSE state " + eventSrc.isOpen() + " Resubscribing after 1 minute...");
+ Thread.sleep(sleep_time * 12);
+ /* Resubscribe again */
+ subscribe = true;
+ }
+ } catch (Exception e){
+ /* Other exception we can keep on retrying */
+ log.info("Connection failed: " + e.getMessage());
+ running = true;
+ subscribe = true;
+ if (eventSrc != null) {
+ eventSrc.close();
+ eventSrc = null;
+ }
+ }
+ }
+
+ log.info("Closed connection to SSE source");
+ }
+
+ private void subscribe() {
try {
modifyEventParamMap(Constants.KSETTING_REST_API_URL, getUriMethod(parentCtrllr.getProperties().authorizationEnabled())
+ parentCtrllr.getCfgInfo().getController_restapiUrl()
@@ -209,6 +246,10 @@ public class PersistentEventConnection implements Runnable {
}
log.info("SSE received url " + event_sseventsUrl);
+ }
+ private EventSource OpenSseConnection() {
+ Parameters p = null;
+
try {
p = getParameters(eventParaMap);
} catch (Exception e) {
@@ -232,21 +273,8 @@ public class PersistentEventConnection implements Runnable {
eventSource.register(new DataChangeEventListener(this));
eventSource.open();
log.info("Connected to SSE source");
-
- while (running) {
- try {
- log.info("SSE state " + eventSource.isOpen());
- Thread.sleep(5000);
- } catch (InterruptedException ie) {
- log.info("Exception: " + ie.getMessage());
- Thread.currentThread().interrupt();
- running = false;
- }
- }
- eventSource.close();
- log.info("Closed connection to SSE source");
+ return eventSource;
}
-
private String getAuthorizationToken(String userName, String password) {
return "Basic " + Base64.getEncoder().encodeToString((
userName + ":" + password).getBytes());