diff options
Diffstat (limited to 'PolicyEngineAPI')
4 files changed, 68 insertions, 58 deletions
diff --git a/PolicyEngineAPI/src/main/java/org/onap/policy/std/AutoClientEnd.java b/PolicyEngineAPI/src/main/java/org/onap/policy/std/AutoClientEnd.java index 9f9dc37e4..00c8c5f0a 100644 --- a/PolicyEngineAPI/src/main/java/org/onap/policy/std/AutoClientEnd.java +++ b/PolicyEngineAPI/src/main/java/org/onap/policy/std/AutoClientEnd.java @@ -21,7 +21,9 @@ package org.onap.policy.std; import java.net.URI; -import javax.websocket.ClientEndpoint; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import org.onap.policy.api.NotificationHandler; @@ -32,7 +34,6 @@ import org.onap.policy.common.logging.flexlogger.FlexLogger; import org.onap.policy.common.logging.flexlogger.Logger; import org.onap.policy.xacml.api.XACMLErrorConstants; -@ClientEndpoint public class AutoClientEnd extends WebSocketClient { private static StdPDPNotification notification = null; private static StdPDPNotification oldNotification = null; @@ -42,8 +43,9 @@ public class AutoClientEnd extends WebSocketClient { private static String url = null; private static boolean status = false; private static boolean stop = false; - private static boolean message = false; private static boolean error = false; + private static boolean restartNeeded = false; + private static ScheduledExecutorService restartExecutorService = null; private static Logger logger = FlexLogger.getLogger(AutoClientEnd.class.getName()); private AutoClientEnd(URI serverUri) { @@ -53,7 +55,6 @@ public class AutoClientEnd extends WebSocketClient { @Override public void onMessage(String msg) { logger.info("Received Auto Notification from : " + getURI() + ", Notification: " + msg); - AutoClientEnd.message = true; try { AutoClientEnd.notification = NotificationUnMarshal.notificationJSON(msg); } catch (Exception e) { @@ -68,45 +69,38 @@ public class AutoClientEnd extends WebSocketClient { AutoClientEnd.oldNotification = AutoClientEnd.notification; callHandler(); } - - AutoClientEnd.message = false; } @Override public void onClose(int code, String reason, boolean remote) { logger.info("AutoClientEnd disconnected from: " + getURI() + "; Code: " + code + ", reason : " + reason); - if (!AutoClientEnd.stop && !AutoClientEnd.message) { - // This Block of code is executed if there is any Network Failure or - // if the Notification is Down. - logger.error(XACMLErrorConstants.ERROR_SYSTEM_ERROR + "Disconnected from Notification Server"); - AutoClientEnd.client = null; - AutoClientEnd.status = false; - // Try to connect Back to available PDP. - AutoClientEnd.error = true; - start(url); - } - AutoClientEnd.message = false; + AutoClientEnd.restartNeeded = true; } @Override public void onError(Exception ex) { logger.error("XACMLErrorConstants.ERROR_PROCESS_FLOW + Error connecting to: " + getURI() + ", Exception occured ...\n" + ex); - // trying to Restart by self. - stop(); - if (AutoClientEnd.url != null) { - AutoClientEnd.client = null; - AutoClientEnd.status = false; - AutoClientEnd.error = true; - AutoClientEnd.start(AutoClientEnd.url); - } + AutoClientEnd.restartNeeded = true; } @Override public void onOpen(ServerHandshake arg0) { + restartNeeded = false; logger.info("Auto Notification Session Started... " + getURI()); } + private static void restart() { + try { + if (client != null && restartNeeded && !stop) { + logger.info("Auto Notification Session Restarting ... " + getUrl()); + client.reconnect(); + } + } catch (Exception e) { + logger.info("Auto Notification Session Error Started... " + getUrl()); + } + } + /** * Sets the auto. * @@ -156,6 +150,10 @@ public class AutoClientEnd extends WebSocketClient { client = new AutoClientEnd(new URI(url + "notifications")); client.connect(); status = true; + restartExecutorService = Executors.newSingleThreadScheduledExecutor(); + Runnable task = AutoClientEnd::restart; + restartExecutorService.scheduleAtFixedRate(task, 60, 60, TimeUnit.SECONDS); + if (error) { // will not trigger. leave it in to later add checks // The URL's will be in Sync according to design Spec. @@ -171,7 +169,6 @@ public class AutoClientEnd extends WebSocketClient { } } catch (Exception e) { logger.error(XACMLErrorConstants.ERROR_SYSTEM_ERROR + e); - client = null; status = false; changeUrl(); } @@ -179,6 +176,7 @@ public class AutoClientEnd extends WebSocketClient { private static void changeUrl() { // Change the PDP if it is not Up. + stop(); StdPolicyEngine.rotatePDPList(); start(StdPolicyEngine.getPDPURL()); } @@ -192,15 +190,25 @@ public class AutoClientEnd extends WebSocketClient { } logger.info("\n Closing Auto Notification WebSocket Connection.. "); stop = true; + // first stop the restart service + try { + restartExecutorService.shutdown(); + } catch (Exception e1) { + logger.info("\n AutoClientEnd: Error stoppping the restart Scheduler "); + } + + // close the connection try { client.closeBlocking(); - } catch (InterruptedException e) { - logger.info("\n Error Closing Auto Notification WebSocket Connection.. InterruptedException"); + } catch (Exception e) { + logger.error("\n ERROR Closing Auto Notification WebSocket Connection.. "); } + logger.info("\n Closed the Auto Notification WebSocket Connection.. "); client = null; status = false; stop = false; + restartNeeded = false; } private static void callHandler() { diff --git a/PolicyEngineAPI/src/main/java/org/onap/policy/std/ManualClientEnd.java b/PolicyEngineAPI/src/main/java/org/onap/policy/std/ManualClientEnd.java index 2fe6dc006..a67b5402e 100644 --- a/PolicyEngineAPI/src/main/java/org/onap/policy/std/ManualClientEnd.java +++ b/PolicyEngineAPI/src/main/java/org/onap/policy/std/ManualClientEnd.java @@ -22,7 +22,6 @@ package org.onap.policy.std; import java.net.URI; import java.util.concurrent.CountDownLatch; -import javax.websocket.ClientEndpoint; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import org.onap.policy.api.NotificationScheme; @@ -33,7 +32,6 @@ import org.onap.policy.common.logging.flexlogger.Logger; import org.onap.policy.std.StdPDPNotification; import org.onap.policy.xacml.api.XACMLErrorConstants; -@ClientEndpoint public class ManualClientEnd extends WebSocketClient { private static CountDownLatch latch; private static StdPDPNotification notification = null; @@ -63,12 +61,11 @@ public class ManualClientEnd extends WebSocketClient { logger.info("Manual Notification Recieved Message from : " + getURI() + ", Notification: " + message); ManualClientEnd.resultJson = message; try { - ManualClientEnd.notification = NotificationUnMarshal.notificationJSON(message); - latch.countDown(); + ManualClientEnd.notification = NotificationUnMarshal.notificationJSON(message); } catch (Exception e) { logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e); - latch.countDown(); } + latch.countDown(); } @Override @@ -95,7 +92,7 @@ public class ManualClientEnd extends WebSocketClient { client = new ManualClientEnd(new URI(url + "notifications")); client.connect(); latch.await(); - client.close(); + client.closeBlocking(); } catch (Exception e) { logger.error(XACMLErrorConstants.ERROR_SYSTEM_ERROR + e); } diff --git a/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/AutoClientEndTest.java b/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/AutoClientEndTest.java index 4f1ce6f59..5056fceb7 100644 --- a/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/AutoClientEndTest.java +++ b/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/AutoClientEndTest.java @@ -21,12 +21,11 @@ package org.onap.policy.std.test; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.java_websocket.WebSocket; import org.java_websocket.handshake.ClientHandshake; import org.java_websocket.server.WebSocketServer; @@ -37,7 +36,6 @@ import org.onap.policy.api.NotificationHandler; import org.onap.policy.api.NotificationScheme; import org.onap.policy.api.PDPNotification; import org.onap.policy.std.AutoClientEnd; -import org.onap.policy.std.StdPDPNotification; import org.springframework.util.SocketUtils; /** @@ -47,9 +45,9 @@ import org.springframework.util.SocketUtils; public class AutoClientEndTest { private static WebSocketServer ws; - private static int port = 18080; + private static int port = SocketUtils.findAvailableTcpPort(); private static CountDownLatch countServerDownLatch = null; - private StdPDPNotification notification = null; + private static PDPNotification notification = null; /** * Start server. @@ -58,8 +56,8 @@ public class AutoClientEndTest { */ @BeforeClass public static void startServer() throws Exception { - port = SocketUtils.findAvailableTcpPort(); - ws = new WebSocketServer(new InetSocketAddress(port), 16) { + notification = null; + ws = new WebSocketServer(new InetSocketAddress(port), 1) { @Override public void onOpen(WebSocket conn, ClientHandshake handshake) { conn.send("{\"removedPolicies\": [],\"loadedPolicies\": " @@ -91,7 +89,7 @@ public class AutoClientEndTest { }; - ws.setConnectionLostTimeout(30); + ws.setConnectionLostTimeout(0); ws.start(); } @@ -102,8 +100,8 @@ public class AutoClientEndTest { NotificationHandler handler = new NotificationHandler() { @Override - public void notificationReceived(PDPNotification notifi) { - notification = (StdPDPNotification) notifi; + public void notificationReceived(PDPNotification notify) { + notification = notify; countServerDownLatch.countDown(); } @@ -113,17 +111,26 @@ public class AutoClientEndTest { countServerDownLatch = new CountDownLatch(1); AutoClientEnd.start("http://localhost:" + port + "/"); - countServerDownLatch.await(); + countServerDownLatch.await(45, TimeUnit.SECONDS); + + assertNotNull(notification); + + // simulate a server restart and verify client reconnects + countServerDownLatch = new CountDownLatch(1); + ws.stop(30000); + startServer(); + countServerDownLatch.await(60+10, TimeUnit.SECONDS); assertNotNull(notification); - assertTrue(AutoClientEnd.getStatus()); + + AutoClientEnd.stop(); + } @AfterClass - public static void successTests() throws InterruptedException, IOException { - AutoClientEnd.stop(); - ws.stop(); + public static void stopServer() throws InterruptedException, IOException { + ws.stop(30000); } diff --git a/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/ManualClientEndTest.java b/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/ManualClientEndTest.java index 4a09164b5..252fa7e70 100644 --- a/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/ManualClientEndTest.java +++ b/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/ManualClientEndTest.java @@ -27,6 +27,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.java_websocket.WebSocket; import org.java_websocket.handshake.ClientHandshake; import org.java_websocket.server.WebSocketServer; @@ -40,13 +41,11 @@ import org.springframework.util.SocketUtils; /** * The class <code>ManualClientEndTest</code> contains tests for the class <code>{@link ManualClientEnd}</code>. * - * @generatedBy CodePro at 6/1/16 1:41 PM - * @version $Revision: 1.0 $ */ public class ManualClientEndTest { private static WebSocketServer ws; - private static int port = 18080; + private static int port = SocketUtils.findAvailableTcpPort(); private static CountDownLatch countServerDownLatch = null; private static String recvMsg = null; @@ -57,8 +56,7 @@ public class ManualClientEndTest { */ @BeforeClass public static void startServer() throws Exception { - port = SocketUtils.findAvailableTcpPort(); - ws = new WebSocketServer(new InetSocketAddress(port), 16) { + ws = new WebSocketServer(new InetSocketAddress(port), 1) { @Override public void onOpen(WebSocket conn, ClientHandshake handshake) { @@ -93,16 +91,16 @@ public class ManualClientEndTest { }; - ws.setConnectionLostTimeout(30); + ws.setConnectionLostTimeout(0); ws.start(); } @Test - public void testAutoClient() throws Exception { + public void testManualClient() throws Exception { countServerDownLatch = new CountDownLatch(1); ManualClientEnd.start("http://localhost:" + port + "/"); - countServerDownLatch.await(); + countServerDownLatch.await(45, TimeUnit.SECONDS); assertNotNull(ManualClientEnd.result(NotificationScheme.MANUAL_ALL_NOTIFICATIONS)); assertTrue("Manual".equalsIgnoreCase(recvMsg)); @@ -110,6 +108,6 @@ public class ManualClientEndTest { @AfterClass public static void successTests() throws InterruptedException, IOException { - ws.stop(); + ws.stop(30000); } } |