diff options
Diffstat (limited to 'core/core-infrastructure')
5 files changed, 81 insertions, 69 deletions
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java index 534bee8af..9d62a9f92 100644 --- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java @@ -41,8 +41,8 @@ import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; /** - * The Class RawMessageHandler handles raw messages being received on a Java web socket and forwards the messages to the - * DataHandler instance that has subscribed to the RawMessageHandler instance. + * The Class RawMessageHandler handles raw messages being received on a Java web socket and forwards + * the messages to the DataHandler instance that has subscribed to the RawMessageHandler instance. * * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) * @param <MESSAGE> the generic type of message being received @@ -85,7 +85,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS return; } - // Read the messages from the web socket and place them on the message queue for handling by the queue + // Read the messages from the web socket and place them on the message queue for handling by + // the queue // processing thread ObjectInputStream ois = null; try { @@ -94,7 +95,7 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS final MessageHolder<MESSAGE> messageHolder = (MessageHolder<MESSAGE>) ois.readObject(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("message {} recieved from the client {} ", messageHolder.toString(), + LOGGER.debug("message {} recieved from the client {} ", messageHolder, messageHolder == null ? "Apex Engine " : messageHolder.getSenderHostAddress()); } @@ -111,7 +112,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS } /** - * This method is called when a string message is received on a web socket and is to be forwarded to a listener. + * This method is called when a string message is received on a web socket and is to be + * forwarded to a listener. * * @param messageString the message string */ @@ -161,6 +163,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS dataHandler.post(messageBlock); } } catch (final InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); LOGGER.debug("raw message listening has been interrupted"); break; } @@ -172,6 +176,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS dataHandler.post(stringMessage); } } catch (final InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); LOGGER.debug("raw message listening has been interrupted"); break; } @@ -180,6 +186,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS try { Thread.sleep(QUEUE_POLL_TIMEOUT); } catch (final InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); LOGGER.debug("raw message listening has been interrupted"); break; } @@ -206,7 +214,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS } /** - * This method is called when a message is received on a web socket and is to be forwarded to a listener. + * This method is called when a message is received on a web socket and is to be forwarded to a + * listener. * * @param data the message data containing a message */ @@ -246,8 +255,5 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS if (listener == null) { throw new IllegalArgumentException("The listener object cannot be null"); } - if (dataHandler == null) { - throw new IllegalStateException("Data handler not initialized"); - } } } diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java index 4a756d6f0..36ad3b163 100644 --- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java @@ -29,8 +29,8 @@ import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; /** - * The Class MessagingClient is the class that wraps web socket handling, message sending, and message reception on the - * client side of a web socket in Apex. + * The Class MessagingClient is the class that wraps web socket handling, message sending, and + * message reception on the client side of a web socket in Apex. * * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) * @param <MESSAGE> the generic type @@ -39,12 +39,13 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE> // The length of time to wait for a connection to a web socket server before aborting private static final int CONNECTION_TIMEOUT_TIME_MS = 3000; - // The length of time to wait before checking if a connection to a web socket server has worked or not + // The length of time to wait before checking if a connection to a web socket server has worked + // or not private static final int CONNECTION_TRY_INTERVAL_MS = 100; /** - * Constructor of this class, uses its {@link InternalMessageBusClient} superclass to set up the web socket and - * handle incoming message forwarding. + * Constructor of this class, uses its {@link InternalMessageBusClient} superclass to set up the + * web socket and handle incoming message forwarding. * * @param serverUri The URI of the service */ @@ -80,7 +81,11 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE> public void startConnection() { // Open the web socket final WebSocket connection = super.getConnection(); - if (connection != null && !connection.isOpen()) { + + if (connection == null) { + throw new IllegalStateException("Could not connect to the server"); + } + if (!connection.isOpen()) { connect(); } @@ -129,8 +134,9 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE> /* * (non-Javadoc) * - * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex.core. - * infrastructure. messaging.MessageHolder) + * @see + * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex + * .core. infrastructure. messaging.MessageHolder) */ @Override public void send(final MessageHolder<MESSAGE> commands) { @@ -142,7 +148,8 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE> /* * (non-Javadoc) * - * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String) + * @see + * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String) */ @Override public void send(final String messageString) { diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java index fc401576f..ee6e1a329 100644 --- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java @@ -98,6 +98,8 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE } catch (final IOException ioe) { LOGGER.catching(ioe); } catch (final InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); // This can happen in normal operation so ignore } isStarted = false; @@ -119,8 +121,9 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE /* * (non-Javadoc) * - * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex.core. - * infrastructure. messaging.MessageHolder) + * @see + * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex + * .core. infrastructure. messaging.MessageHolder) */ @Override public void send(final MessageHolder<MESSAGE> message) { @@ -134,7 +137,8 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE /* * (non-Javadoc) * - * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String) + * @see + * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String) */ @Override public void send(final String messageString) { diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java index d15f86c8a..44b3c7fab 100644 --- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java @@ -33,8 +33,8 @@ import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; /** - * The Class MessagingUtils is a class with static methods used in IPC messaging for finding free ports, translating - * host names to addresses, serializing objects and flushing object streams. + * The Class MessagingUtils is a class with static methods used in IPC messaging for finding free + * ports, translating host names to addresses, serializing objects and flushing object streams. * * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) */ @@ -42,6 +42,11 @@ public final class MessagingUtils { // The port number of the lowest user port, ports 0-1023 are system ports private static final int LOWEST_USER_PORT = 1024; + /** + * Port number is an unsigned 16-bit integer, so maximum port is 65535 + */ + private static final int MAX_PORT_RANGE = 65535; + // Logger for this class private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessagingUtils.class); @@ -51,8 +56,8 @@ public final class MessagingUtils { private MessagingUtils() {} /** - * This method searches the availability of the port, if the requested port not available, this method will throw an - * exception. + * This method searches the availability of the port, if the requested port not available, this + * method will throw an exception. * * @param port the port to check * @return the port verified as being free @@ -61,33 +66,18 @@ public final class MessagingUtils { public static int checkPort(final int port) { LOGGER.entry("Checking availability of port {}", port); - Socket s = null; - try { - // Try to connect to the port, if we can connect then the port is occupied - s = new Socket("localhost", port); - LOGGER.debug("Port {} is not available", port); - - throw new RuntimeException("could not allocate requested port: " + port); - } catch (final IOException e) { - // We found a free port + if (isPortAvailable(port)) { LOGGER.debug("Port {} is available ", port); return port; - } finally { - // Close the socket used to check if the port was free - if (s != null) { - try { - s.close(); - } catch (final IOException e) { - LOGGER.catching(e); - LOGGER.warn("could not allocate requested port " + port, e); - } - } } + LOGGER.debug("Port {} is not available", port); + throw new RuntimeException("could not allocate requested port: " + port); } /** - * This method searches the availability of the port, if the requested port not available,this method will increment - * the port number and check the availability of that port, this process will continue until it find port available. + * This method searches the availability of the port, if the requested port not available,this + * method will increment the port number and check the availability of that port, this process + * will continue until it reaches max port range which is {@link MAX_PORT_RANGE}. * * @param port the first port to check * @return the port that was found @@ -96,29 +86,31 @@ public final class MessagingUtils { public static int findPort(final int port) { LOGGER.entry("Checking availability of port {}", port); - Socket s = null; - try { - // Try to connect to the port, if we can connect then the port is occupied - s = new Socket("localhost", port); - LOGGER.debug("Port {} is not available", port); + int availablePort = port; - // Recurse and try the next port - return findPort(port + 1); - } catch (final IOException e) { - // We found a free port - LOGGER.debug("Port {} is available ", port); - return port; - } finally { - // Close the socket used to check if the port was free - if (s != null) { - try { - s.close(); - } catch (final IOException e) { - LOGGER.catching(e); - LOGGER.warn("could not allocate requested port " + port, e); - throw new RuntimeException("could not allocate requested port " + port, e); - } + while (availablePort <= MAX_PORT_RANGE) { + if (isPortAvailable(availablePort)) { + LOGGER.debug("Port {} is available ", availablePort); + return availablePort; } + LOGGER.debug("Port {} is not available", availablePort); + availablePort++; + } + throw new RuntimeException("could not find free available"); + } + + /** + * Check if port is available or not + * + * @param port + * @return true if port is available + */ + public static boolean isPortAvailable(final int port) { + try (final Socket socket = new Socket("localhost", port)) { + return false; + } catch (final IOException ignoredException) { + LOGGER.trace("Port {} is available", port, ignoredException); + return true; } } @@ -136,8 +128,9 @@ public final class MessagingUtils { } /** - * This method searches the availability of the port, if the requested port not available,this method will increment - * the port number and check the availability, this process will continue until it find port available. + * This method searches the availability of the port, if the requested port not available,this + * method will increment the port number and check the availability, this process will continue + * until it find port available. * * @param port the first port to check * @return the port that was found diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java index 56b903f38..b5663e452 100644 --- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java @@ -42,6 +42,8 @@ public final class ThreadUtilities { try { Thread.sleep(milliseconds); } catch (final InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); return false; } |