summaryrefslogtreecommitdiffstats
path: root/core/core-infrastructure/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/core-infrastructure/src')
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java24
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java25
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java10
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java89
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java2
5 files changed, 81 insertions, 69 deletions
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java
index 534bee8af..9d62a9f92 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java
@@ -41,8 +41,8 @@ import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
/**
- * The Class RawMessageHandler handles raw messages being received on a Java web socket and forwards the messages to the
- * DataHandler instance that has subscribed to the RawMessageHandler instance.
+ * The Class RawMessageHandler handles raw messages being received on a Java web socket and forwards
+ * the messages to the DataHandler instance that has subscribed to the RawMessageHandler instance.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
* @param <MESSAGE> the generic type of message being received
@@ -85,7 +85,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
return;
}
- // Read the messages from the web socket and place them on the message queue for handling by the queue
+ // Read the messages from the web socket and place them on the message queue for handling by
+ // the queue
// processing thread
ObjectInputStream ois = null;
try {
@@ -94,7 +95,7 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
final MessageHolder<MESSAGE> messageHolder = (MessageHolder<MESSAGE>) ois.readObject();
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("message {} recieved from the client {} ", messageHolder.toString(),
+ LOGGER.debug("message {} recieved from the client {} ", messageHolder,
messageHolder == null ? "Apex Engine " : messageHolder.getSenderHostAddress());
}
@@ -111,7 +112,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
}
/**
- * This method is called when a string message is received on a web socket and is to be forwarded to a listener.
+ * This method is called when a string message is received on a web socket and is to be
+ * forwarded to a listener.
*
* @param messageString the message string
*/
@@ -161,6 +163,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
dataHandler.post(messageBlock);
}
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.debug("raw message listening has been interrupted");
break;
}
@@ -172,6 +176,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
dataHandler.post(stringMessage);
}
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.debug("raw message listening has been interrupted");
break;
}
@@ -180,6 +186,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
try {
Thread.sleep(QUEUE_POLL_TIMEOUT);
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.debug("raw message listening has been interrupted");
break;
}
@@ -206,7 +214,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
}
/**
- * This method is called when a message is received on a web socket and is to be forwarded to a listener.
+ * This method is called when a message is received on a web socket and is to be forwarded to a
+ * listener.
*
* @param data the message data containing a message
*/
@@ -246,8 +255,5 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
if (listener == null) {
throw new IllegalArgumentException("The listener object cannot be null");
}
- if (dataHandler == null) {
- throw new IllegalStateException("Data handler not initialized");
- }
}
}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java
index 4a756d6f0..36ad3b163 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java
@@ -29,8 +29,8 @@ import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
/**
- * The Class MessagingClient is the class that wraps web socket handling, message sending, and message reception on the
- * client side of a web socket in Apex.
+ * The Class MessagingClient is the class that wraps web socket handling, message sending, and
+ * message reception on the client side of a web socket in Apex.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
* @param <MESSAGE> the generic type
@@ -39,12 +39,13 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE>
// The length of time to wait for a connection to a web socket server before aborting
private static final int CONNECTION_TIMEOUT_TIME_MS = 3000;
- // The length of time to wait before checking if a connection to a web socket server has worked or not
+ // The length of time to wait before checking if a connection to a web socket server has worked
+ // or not
private static final int CONNECTION_TRY_INTERVAL_MS = 100;
/**
- * Constructor of this class, uses its {@link InternalMessageBusClient} superclass to set up the web socket and
- * handle incoming message forwarding.
+ * Constructor of this class, uses its {@link InternalMessageBusClient} superclass to set up the
+ * web socket and handle incoming message forwarding.
*
* @param serverUri The URI of the service
*/
@@ -80,7 +81,11 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE>
public void startConnection() {
// Open the web socket
final WebSocket connection = super.getConnection();
- if (connection != null && !connection.isOpen()) {
+
+ if (connection == null) {
+ throw new IllegalStateException("Could not connect to the server");
+ }
+ if (!connection.isOpen()) {
connect();
}
@@ -129,8 +134,9 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE>
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex.core.
- * infrastructure. messaging.MessageHolder)
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex
+ * .core. infrastructure. messaging.MessageHolder)
*/
@Override
public void send(final MessageHolder<MESSAGE> commands) {
@@ -142,7 +148,8 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE>
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String)
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String)
*/
@Override
public void send(final String messageString) {
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java
index fc401576f..ee6e1a329 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java
@@ -98,6 +98,8 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE
} catch (final IOException ioe) {
LOGGER.catching(ioe);
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
// This can happen in normal operation so ignore
}
isStarted = false;
@@ -119,8 +121,9 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex.core.
- * infrastructure. messaging.MessageHolder)
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex
+ * .core. infrastructure. messaging.MessageHolder)
*/
@Override
public void send(final MessageHolder<MESSAGE> message) {
@@ -134,7 +137,8 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String)
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String)
*/
@Override
public void send(final String messageString) {
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java
index d15f86c8a..44b3c7fab 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java
@@ -33,8 +33,8 @@ import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
/**
- * The Class MessagingUtils is a class with static methods used in IPC messaging for finding free ports, translating
- * host names to addresses, serializing objects and flushing object streams.
+ * The Class MessagingUtils is a class with static methods used in IPC messaging for finding free
+ * ports, translating host names to addresses, serializing objects and flushing object streams.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
*/
@@ -42,6 +42,11 @@ public final class MessagingUtils {
// The port number of the lowest user port, ports 0-1023 are system ports
private static final int LOWEST_USER_PORT = 1024;
+ /**
+ * Port number is an unsigned 16-bit integer, so maximum port is 65535
+ */
+ private static final int MAX_PORT_RANGE = 65535;
+
// Logger for this class
private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessagingUtils.class);
@@ -51,8 +56,8 @@ public final class MessagingUtils {
private MessagingUtils() {}
/**
- * This method searches the availability of the port, if the requested port not available, this method will throw an
- * exception.
+ * This method searches the availability of the port, if the requested port not available, this
+ * method will throw an exception.
*
* @param port the port to check
* @return the port verified as being free
@@ -61,33 +66,18 @@ public final class MessagingUtils {
public static int checkPort(final int port) {
LOGGER.entry("Checking availability of port {}", port);
- Socket s = null;
- try {
- // Try to connect to the port, if we can connect then the port is occupied
- s = new Socket("localhost", port);
- LOGGER.debug("Port {} is not available", port);
-
- throw new RuntimeException("could not allocate requested port: " + port);
- } catch (final IOException e) {
- // We found a free port
+ if (isPortAvailable(port)) {
LOGGER.debug("Port {} is available ", port);
return port;
- } finally {
- // Close the socket used to check if the port was free
- if (s != null) {
- try {
- s.close();
- } catch (final IOException e) {
- LOGGER.catching(e);
- LOGGER.warn("could not allocate requested port " + port, e);
- }
- }
}
+ LOGGER.debug("Port {} is not available", port);
+ throw new RuntimeException("could not allocate requested port: " + port);
}
/**
- * This method searches the availability of the port, if the requested port not available,this method will increment
- * the port number and check the availability of that port, this process will continue until it find port available.
+ * This method searches the availability of the port, if the requested port not available,this
+ * method will increment the port number and check the availability of that port, this process
+ * will continue until it reaches max port range which is {@link MAX_PORT_RANGE}.
*
* @param port the first port to check
* @return the port that was found
@@ -96,29 +86,31 @@ public final class MessagingUtils {
public static int findPort(final int port) {
LOGGER.entry("Checking availability of port {}", port);
- Socket s = null;
- try {
- // Try to connect to the port, if we can connect then the port is occupied
- s = new Socket("localhost", port);
- LOGGER.debug("Port {} is not available", port);
+ int availablePort = port;
- // Recurse and try the next port
- return findPort(port + 1);
- } catch (final IOException e) {
- // We found a free port
- LOGGER.debug("Port {} is available ", port);
- return port;
- } finally {
- // Close the socket used to check if the port was free
- if (s != null) {
- try {
- s.close();
- } catch (final IOException e) {
- LOGGER.catching(e);
- LOGGER.warn("could not allocate requested port " + port, e);
- throw new RuntimeException("could not allocate requested port " + port, e);
- }
+ while (availablePort <= MAX_PORT_RANGE) {
+ if (isPortAvailable(availablePort)) {
+ LOGGER.debug("Port {} is available ", availablePort);
+ return availablePort;
}
+ LOGGER.debug("Port {} is not available", availablePort);
+ availablePort++;
+ }
+ throw new RuntimeException("could not find free available");
+ }
+
+ /**
+ * Check if port is available or not
+ *
+ * @param port
+ * @return true if port is available
+ */
+ public static boolean isPortAvailable(final int port) {
+ try (final Socket socket = new Socket("localhost", port)) {
+ return false;
+ } catch (final IOException ignoredException) {
+ LOGGER.trace("Port {} is available", port, ignoredException);
+ return true;
}
}
@@ -136,8 +128,9 @@ public final class MessagingUtils {
}
/**
- * This method searches the availability of the port, if the requested port not available,this method will increment
- * the port number and check the availability, this process will continue until it find port available.
+ * This method searches the availability of the port, if the requested port not available,this
+ * method will increment the port number and check the availability, this process will continue
+ * until it find port available.
*
* @param port the first port to check
* @return the port that was found
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java
index 56b903f38..b5663e452 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java
@@ -42,6 +42,8 @@ public final class ThreadUtilities {
try {
Thread.sleep(milliseconds);
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
return false;
}