From 66b9878c0e44020499517b2dd71801fbca2c125f Mon Sep 17 00:00:00 2001 From: s00370346 Date: Mon, 3 Jun 2019 12:02:02 +0530 Subject: Issue-ID: DCAEGEN2-1587 Restconf resubscribe if external ctlr restarts Signed-off-by: s00370346 Change-Id: Iaa5559d80ee07c019cb8ca23a73218d52952dc5c --- .../dcae/controller/PersistentEventConnection.java | 60 ++++++++++++++++------ 1 file changed, 44 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java b/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java index 48d5448..14eb2e3 100644 --- a/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java +++ b/src/main/java/org/onap/dcae/controller/PersistentEventConnection.java @@ -59,6 +59,7 @@ public class PersistentEventConnection implements Runnable { private String event_ruleId; private EventConnectionState state; private volatile boolean running = true; + private static boolean subscribe = true; private static final Logger log = LoggerFactory.getLogger(PersistentEventConnection.class); private boolean modifyEvent; private String modifyMethod; @@ -173,13 +174,49 @@ public class PersistentEventConnection implements Runnable { this.eventParaMap = new HashMap<>(); this.eventParaMap.putAll(builder.parentCtrllr.getParaMap()); printEventParamMap(); - log.info("New persistent connection created " + event_name + "modify event " + modifyEvent); + log.info("New persistent connection created " + event_name + " modify event " + modifyEvent); } @Override public void run() { - Parameters p = null; + int sleep_time = 5000; + boolean openState = false; + EventSource eventSrc = null; + while (running) { + try { + Thread.sleep(sleep_time); + if (subscribe) { + subscribe = false; + subscribe(); + eventSrc = OpenSseConnection(); + openState = eventSrc.isOpen(); + log.info("SSE state " + eventSrc.isOpen()); + } + log.info("SSE state " + openState); + if (eventSrc != null && !openState) + { + log.info("SSE state " + eventSrc.isOpen() + " Resubscribing after 1 minute..."); + Thread.sleep(sleep_time * 12); + /* Resubscribe again */ + subscribe = true; + } + } catch (Exception e){ + /* Other exception we can keep on retrying */ + log.info("Connection failed: " + e.getMessage()); + running = true; + subscribe = true; + if (eventSrc != null) { + eventSrc.close(); + eventSrc = null; + } + } + } + + log.info("Closed connection to SSE source"); + } + + private void subscribe() { try { modifyEventParamMap(Constants.KSETTING_REST_API_URL, getUriMethod(parentCtrllr.getProperties().authorizationEnabled()) + parentCtrllr.getCfgInfo().getController_restapiUrl() @@ -209,6 +246,10 @@ public class PersistentEventConnection implements Runnable { } log.info("SSE received url " + event_sseventsUrl); + } + private EventSource OpenSseConnection() { + Parameters p = null; + try { p = getParameters(eventParaMap); } catch (Exception e) { @@ -232,21 +273,8 @@ public class PersistentEventConnection implements Runnable { eventSource.register(new DataChangeEventListener(this)); eventSource.open(); log.info("Connected to SSE source"); - - while (running) { - try { - log.info("SSE state " + eventSource.isOpen()); - Thread.sleep(5000); - } catch (InterruptedException ie) { - log.info("Exception: " + ie.getMessage()); - Thread.currentThread().interrupt(); - running = false; - } - } - eventSource.close(); - log.info("Closed connection to SSE source"); + return eventSource; } - private String getAuthorizationToken(String userName, String password) { return "Basic " + Base64.getEncoder().encodeToString(( userName + ":" + password).getBytes()); -- cgit 1.2.3-korg