summaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java35
-rw-r--r--core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/EngineServiceFacade.java26
-rw-r--r--core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java21
-rw-r--r--core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxStateFacade.java2
-rw-r--r--core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java2
-rw-r--r--core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/StateFinalizerExecutionContext.java2
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java61
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java25
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java10
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java89
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java2
11 files changed, 142 insertions, 133 deletions
diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java
index c2a19a167..5fc7dc8c6 100644
--- a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java
+++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java
@@ -20,8 +20,6 @@
package org.onap.policy.apex.core.deployment;
-import com.google.common.eventbus.Subscribe;
-
import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -37,10 +35,13 @@ import org.onap.policy.apex.core.protocols.Message;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
+import com.google.common.eventbus.Subscribe;
+
/**
- * The Class DeploymentClient handles the client side of an EngDep communication session with an Apex server. It runs a
- * thread to handle message sending and session monitoring. It uses a sending queue to queue messages for sending by the
- * client thread and a receiving queue to queue messages received from the Apex engine.
+ * The Class DeploymentClient handles the client side of an EngDep communication session with an
+ * Apex server. It runs a thread to handle message sending and session monitoring. It uses a sending
+ * queue to queue messages for sending by the client thread and a receiving queue to queue messages
+ * received from the Apex engine.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
@@ -90,7 +91,8 @@ public class DeploymentClient implements Runnable {
thisThread.setName(DeploymentClient.class.getName() + "-" + host + ":" + port);
try {
- // Establish a connection to the Apex server for EngDep message communication over Web Sockets
+ // Establish a connection to the Apex server for EngDep message communication over Web
+ // Sockets
service = factory.createClient(new URI("ws://" + host + ":" + port));
service.addMessageListener(new DeploymentClientListener());
@@ -101,7 +103,6 @@ public class DeploymentClient implements Runnable {
LOGGER.error("engine<-->deployment client thread exception", e);
return;
}
-
// Loop forever, sending messages as they appear on the queue
while (true) {
try {
@@ -110,6 +111,8 @@ public class DeploymentClient implements Runnable {
} catch (final InterruptedException e) {
// Message sending has been interrupted, we are finished
LOGGER.debug("engine<-->deployment client interrupted");
+ // restore the interrupt status
+ thisThread.interrupt();
break;
}
}
@@ -169,10 +172,11 @@ public class DeploymentClient implements Runnable {
}
/**
- * The listener interface for receiving deploymentClient events. The class that is interested in processing a
- * deploymentClient event implements this interface, and the object created with that class is registered with a
- * component using the component's {@code addDeploymentClientListener} method. When the deploymentClient event
- * occurs, that object's appropriate method is invoked.
+ * The listener interface for receiving deploymentClient events. The class that is interested in
+ * processing a deploymentClient event implements this interface, and the object created with
+ * that class is registered with a component using the component's
+ * {@code addDeploymentClientListener} method. When the deploymentClient event occurs, that
+ * object's appropriate method is invoked.
*
* @see DeploymentClientEvent
*/
@@ -180,8 +184,9 @@ public class DeploymentClient implements Runnable {
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap.policy.apex.core.
- * infrastructure.messaging.impl.ws.messageblock. MessageBlock)
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap.
+ * policy.apex.core. infrastructure.messaging.impl.ws.messageblock. MessageBlock)
*/
@Subscribe
@Override
@@ -192,7 +197,9 @@ public class DeploymentClient implements Runnable {
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.String)
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.
+ * String)
*/
@Override
public void onMessage(final String messageString) {
diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/EngineServiceFacade.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/EngineServiceFacade.java
index d954feaa3..11e870c9c 100644
--- a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/EngineServiceFacade.java
+++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/EngineServiceFacade.java
@@ -50,11 +50,12 @@ import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
/**
- * The Class Deployer deploys an Apex model held as an XML file onto an Apex engine. It uses the EngDep protocol to
- * communicate with the engine, with the EngDep protocol being carried on Java web sockets.
+ * The Class Deployer deploys an Apex model held as an XML file onto an Apex engine. It uses the
+ * EngDep protocol to communicate with the engine, with the EngDep protocol being carried on Java
+ * web sockets.
*
- * This deployer is a simple command line deployer that reads the communication parameters and the location of the XML
- * model file as arguments.
+ * This deployer is a simple command line deployer that reads the communication parameters and the
+ * location of the XML model file as arguments.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
@@ -62,7 +63,8 @@ public class EngineServiceFacade {
// Get a reference to the logger
private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceFacade.class);
- // The default message timeout and timeout increment (the amount of time between polls) in milliseconds
+ // The default message timeout and timeout increment (the amount of time between polls) in
+ // milliseconds
private static final int CLIENT_START_WAIT_INTERVAL = 100;
private static final int REPLY_MESSAGE_TIMEOUT_DEFAULT = 10000;
private static final int REPLY_MESSAGE_TIMEOUT_INCREMENT = 100;
@@ -100,7 +102,8 @@ public class EngineServiceFacade {
try {
LOGGER.debug("handshaking with server {}:{} . . .", hostName, port);
- // Use the deployment client to handle the EngDep communication towards the Apex server. It runs a thread to
+ // Use the deployment client to handle the EngDep communication towards the Apex server.
+ // It runs a thread to
// monitor the session and to send
// messages
client = new DeploymentClient(hostName, port);
@@ -184,7 +187,8 @@ public class EngineServiceFacade {
*
* @param modelFileName the name of the model file containing the model to deploy
* @param ignoreConflicts true if conflicts between context in polices is to be ignored
- * @param force true if the model is to be applied even if it is incompatible with the existing model
+ * @param force true if the model is to be applied even if it is incompatible with the existing
+ * model
* @throws ApexException on Apex errors
* @throws IOException on IO exceptions from the operating system
*/
@@ -215,7 +219,8 @@ public class EngineServiceFacade {
* @param modelFileName the name of the model file containing the model to deploy
* @param modelInputStream the stream that holds the Apex model
* @param ignoreConflicts true if conflicts between context in polices is to be ignored
- * @param force true if the model is to be applied even if it is incompatible with the existing model
+ * @param force true if the model is to be applied even if it is incompatible with the existing
+ * model
* @throws ApexException on model deployment errors
*/
public void deployModel(final String modelFileName, final InputStream modelInputStream,
@@ -238,7 +243,8 @@ public class EngineServiceFacade {
*
* @param apexPolicyModel the name of the model to deploy
* @param ignoreConflicts true if conflicts between context in polices is to be ignored
- * @param force true if the model is to be applied even if it is incompatible with the existing model
+ * @param force true if the model is to be applied even if it is incompatible with the existing
+ * model
* @throws ApexException on model deployment errors
*/
public void deployModel(final AxPolicyModel apexPolicyModel, final boolean ignoreConflicts, final boolean force)
@@ -436,6 +442,8 @@ public class EngineServiceFacade {
try {
receivedMessage = client.getReceiveQueue().poll(REPLY_MESSAGE_TIMEOUT_INCREMENT, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.warn("reception of response from server interrupted {}:{}", hostName, port, e);
throw new ApexDeploymentException(
"reception of response from server interrupted " + hostName + ':' + port, e);
diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java
index 2274b7c23..aaec71b47 100644
--- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java
+++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java
@@ -34,7 +34,8 @@ import org.onap.policy.apex.model.policymodel.concepts.AxStateOutput;
import org.onap.policy.apex.model.utilities.Assertions;
/**
- * This class is the output of a state, and is used by the engine to decide what the next state for execution is.
+ * This class is the output of a state, and is used by the engine to decide what the next state for
+ * execution is.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
@@ -115,15 +116,11 @@ public class StateOutput {
for (final Entry<String, Object> incomingFieldEntry : eventFieldMap.entrySet()) {
final String fieldName = incomingFieldEntry.getKey();
final AxField fieldDef = incomingFieldDefinitionMap.get(fieldName);
- try {
-
- // Check if this field is a field in the event
- if (!outputEventDef.getFields().contains(fieldDef)) {
- throw new StateMachineException(
- "field \"" + fieldName + "\" does not exist on event \"" + outputEventDef.getID() + "\"");
- }
- } catch (final Exception e) {
- e.printStackTrace();
+
+ // Check if this field is a field in the event
+ if (!outputEventDef.getFields().contains(fieldDef)) {
+ throw new StateMachineException(
+ "field \"" + fieldName + "\" does not exist on event \"" + outputEventDef.getID() + "\"");
}
// Set the value in the output event
@@ -132,8 +129,8 @@ public class StateOutput {
}
/**
- * This method copies any fields that exist on the input event that also exist on the output event if they are not
- * set on the output event.
+ * This method copies any fields that exist on the input event that also exist on the output
+ * event if they are not set on the output event.
*
* @param incomingEvent The incoming event to copy from
*/
diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxStateFacade.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxStateFacade.java
index 226f06ade..80d7e651b 100644
--- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxStateFacade.java
+++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxStateFacade.java
@@ -40,7 +40,7 @@ public class AxStateFacade {
// CHECKSTYLE:OFF: checkstyle:visibilityModifier Logic has access to this field
/** The full definition information for the state. */
- public AxState state;
+ public final AxState state;
// CHECKSTYLE:ON: checkstyle:visibilityModifier
diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java
index 015f3ae80..85bf96c8d 100644
--- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java
+++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java
@@ -46,7 +46,7 @@ public class AxTaskFacade {
* The full definition of the task we are presenting a facade to, executing logic has full access to the task
* definition.
*/
- public AxTask task;
+ public final AxTask task;
// CHECKSTYLE:ON: checkstyle:visibilityModifier
diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/StateFinalizerExecutionContext.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/StateFinalizerExecutionContext.java
index 2e5971142..874d4def1 100644
--- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/StateFinalizerExecutionContext.java
+++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/StateFinalizerExecutionContext.java
@@ -70,7 +70,7 @@ public class StateFinalizerExecutionContext {
* fields to determine what state output to select. Once a state finalizer has selected a state output, it must
* marshal these fields so that they match the fields required for the event defined in the state output.
*/
- public Map<String, Object> fields;
+ public final Map<String, Object> fields;
// A message specified in the logic
private String message;
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java
index 534bee8af..7e9a31a4f 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java
@@ -20,8 +20,6 @@
package org.onap.policy.apex.core.infrastructure.messaging.impl.ws;
-import com.google.common.eventbus.Subscribe;
-
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -40,9 +38,11 @@ import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
+import com.google.common.eventbus.Subscribe;
+
/**
- * The Class RawMessageHandler handles raw messages being received on a Java web socket and forwards the messages to the
- * DataHandler instance that has subscribed to the RawMessageHandler instance.
+ * The Class RawMessageHandler handles raw messages being received on a Java web socket and forwards
+ * the messages to the DataHandler instance that has subscribed to the RawMessageHandler instance.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
* @param <MESSAGE> the generic type of message being received
@@ -85,33 +85,35 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
return;
}
- // Read the messages from the web socket and place them on the message queue for handling by the queue
+ // Read the messages from the web socket and place them on the message queue for handling by
+ // the queue
// processing thread
- ObjectInputStream ois = null;
- try {
- ois = new ObjectInputStream(new ByteArrayInputStream(dataByteBuffer.array()));
+
+ try (final ByteArrayInputStream stream = new ByteArrayInputStream(dataByteBuffer.array());
+ final ObjectInputStream ois = new ObjectInputStream(stream);) {
@SuppressWarnings("unchecked")
final MessageHolder<MESSAGE> messageHolder = (MessageHolder<MESSAGE>) ois.readObject();
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("message {} recieved from the client {} ", messageHolder.toString(),
+ LOGGER.debug("message {} recieved from the client {} ", messageHolder,
messageHolder == null ? "Apex Engine " : messageHolder.getSenderHostAddress());
}
- final List<MESSAGE> messages = messageHolder.getMessages();
- if (messages != null) {
- messageBlockQueue.add(new MessageBlock<MESSAGE>(messages, incomingData.getConn()));
+ if (messageHolder != null) {
+ final List<MESSAGE> messages = messageHolder.getMessages();
+ if (messages != null) {
+ messageBlockQueue.add(new MessageBlock<MESSAGE>(messages, incomingData.getConn()));
+ }
}
- } catch (IOException | ClassNotFoundException e) {
+ } catch (final IOException | ClassNotFoundException e) {
LOGGER.error("Failed to process message received");
LOGGER.catching(e);
- } finally {
- closeObjectStream(ois);
}
}
/**
- * This method is called when a string message is received on a web socket and is to be forwarded to a listener.
+ * This method is called when a string message is received on a web socket and is to be
+ * forwarded to a listener.
*
* @param messageString the message string
*/
@@ -128,21 +130,6 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
}
/**
- * Close the {@link ObjectInputStream} stream.
- *
- * @param ois is an instance of {@link ObjectInputStream}
- */
- private void closeObjectStream(final ObjectInputStream ois) {
- if (ois != null) {
- try {
- ois.close();
- } catch (final IOException e) {
- LOGGER.catching(e);
- }
- }
- }
-
- /**
* This thread monitors the message queue and processes messages as they appear on the queue.
*
* @see java.lang.Runnable#run()
@@ -161,6 +148,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
dataHandler.post(messageBlock);
}
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.debug("raw message listening has been interrupted");
break;
}
@@ -172,6 +161,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
dataHandler.post(stringMessage);
}
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.debug("raw message listening has been interrupted");
break;
}
@@ -180,6 +171,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
try {
Thread.sleep(QUEUE_POLL_TIMEOUT);
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
LOGGER.debug("raw message listening has been interrupted");
break;
}
@@ -206,7 +199,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
}
/**
- * This method is called when a message is received on a web socket and is to be forwarded to a listener.
+ * This method is called when a message is received on a web socket and is to be forwarded to a
+ * listener.
*
* @param data the message data containing a message
*/
@@ -246,8 +240,5 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
if (listener == null) {
throw new IllegalArgumentException("The listener object cannot be null");
}
- if (dataHandler == null) {
- throw new IllegalStateException("Data handler not initialized");
- }
}
}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java
index 4a756d6f0..36ad3b163 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java
@@ -29,8 +29,8 @@ import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
/**
- * The Class MessagingClient is the class that wraps web socket handling, message sending, and message reception on the
- * client side of a web socket in Apex.
+ * The Class MessagingClient is the class that wraps web socket handling, message sending, and
+ * message reception on the client side of a web socket in Apex.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
* @param <MESSAGE> the generic type
@@ -39,12 +39,13 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE>
// The length of time to wait for a connection to a web socket server before aborting
private static final int CONNECTION_TIMEOUT_TIME_MS = 3000;
- // The length of time to wait before checking if a connection to a web socket server has worked or not
+ // The length of time to wait before checking if a connection to a web socket server has worked
+ // or not
private static final int CONNECTION_TRY_INTERVAL_MS = 100;
/**
- * Constructor of this class, uses its {@link InternalMessageBusClient} superclass to set up the web socket and
- * handle incoming message forwarding.
+ * Constructor of this class, uses its {@link InternalMessageBusClient} superclass to set up the
+ * web socket and handle incoming message forwarding.
*
* @param serverUri The URI of the service
*/
@@ -80,7 +81,11 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE>
public void startConnection() {
// Open the web socket
final WebSocket connection = super.getConnection();
- if (connection != null && !connection.isOpen()) {
+
+ if (connection == null) {
+ throw new IllegalStateException("Could not connect to the server");
+ }
+ if (!connection.isOpen()) {
connect();
}
@@ -129,8 +134,9 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE>
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex.core.
- * infrastructure. messaging.MessageHolder)
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex
+ * .core. infrastructure. messaging.MessageHolder)
*/
@Override
public void send(final MessageHolder<MESSAGE> commands) {
@@ -142,7 +148,8 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE>
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String)
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String)
*/
@Override
public void send(final String messageString) {
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java
index fc401576f..ee6e1a329 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java
@@ -98,6 +98,8 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE
} catch (final IOException ioe) {
LOGGER.catching(ioe);
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
// This can happen in normal operation so ignore
}
isStarted = false;
@@ -119,8 +121,9 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex.core.
- * infrastructure. messaging.MessageHolder)
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex
+ * .core. infrastructure. messaging.MessageHolder)
*/
@Override
public void send(final MessageHolder<MESSAGE> message) {
@@ -134,7 +137,8 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE
/*
* (non-Javadoc)
*
- * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String)
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String)
*/
@Override
public void send(final String messageString) {
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java
index d15f86c8a..44b3c7fab 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java
@@ -33,8 +33,8 @@ import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
/**
- * The Class MessagingUtils is a class with static methods used in IPC messaging for finding free ports, translating
- * host names to addresses, serializing objects and flushing object streams.
+ * The Class MessagingUtils is a class with static methods used in IPC messaging for finding free
+ * ports, translating host names to addresses, serializing objects and flushing object streams.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
*/
@@ -42,6 +42,11 @@ public final class MessagingUtils {
// The port number of the lowest user port, ports 0-1023 are system ports
private static final int LOWEST_USER_PORT = 1024;
+ /**
+ * Port number is an unsigned 16-bit integer, so maximum port is 65535
+ */
+ private static final int MAX_PORT_RANGE = 65535;
+
// Logger for this class
private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessagingUtils.class);
@@ -51,8 +56,8 @@ public final class MessagingUtils {
private MessagingUtils() {}
/**
- * This method searches the availability of the port, if the requested port not available, this method will throw an
- * exception.
+ * This method searches the availability of the port, if the requested port not available, this
+ * method will throw an exception.
*
* @param port the port to check
* @return the port verified as being free
@@ -61,33 +66,18 @@ public final class MessagingUtils {
public static int checkPort(final int port) {
LOGGER.entry("Checking availability of port {}", port);
- Socket s = null;
- try {
- // Try to connect to the port, if we can connect then the port is occupied
- s = new Socket("localhost", port);
- LOGGER.debug("Port {} is not available", port);
-
- throw new RuntimeException("could not allocate requested port: " + port);
- } catch (final IOException e) {
- // We found a free port
+ if (isPortAvailable(port)) {
LOGGER.debug("Port {} is available ", port);
return port;
- } finally {
- // Close the socket used to check if the port was free
- if (s != null) {
- try {
- s.close();
- } catch (final IOException e) {
- LOGGER.catching(e);
- LOGGER.warn("could not allocate requested port " + port, e);
- }
- }
}
+ LOGGER.debug("Port {} is not available", port);
+ throw new RuntimeException("could not allocate requested port: " + port);
}
/**
- * This method searches the availability of the port, if the requested port not available,this method will increment
- * the port number and check the availability of that port, this process will continue until it find port available.
+ * This method searches the availability of the port, if the requested port not available,this
+ * method will increment the port number and check the availability of that port, this process
+ * will continue until it reaches max port range which is {@link MAX_PORT_RANGE}.
*
* @param port the first port to check
* @return the port that was found
@@ -96,29 +86,31 @@ public final class MessagingUtils {
public static int findPort(final int port) {
LOGGER.entry("Checking availability of port {}", port);
- Socket s = null;
- try {
- // Try to connect to the port, if we can connect then the port is occupied
- s = new Socket("localhost", port);
- LOGGER.debug("Port {} is not available", port);
+ int availablePort = port;
- // Recurse and try the next port
- return findPort(port + 1);
- } catch (final IOException e) {
- // We found a free port
- LOGGER.debug("Port {} is available ", port);
- return port;
- } finally {
- // Close the socket used to check if the port was free
- if (s != null) {
- try {
- s.close();
- } catch (final IOException e) {
- LOGGER.catching(e);
- LOGGER.warn("could not allocate requested port " + port, e);
- throw new RuntimeException("could not allocate requested port " + port, e);
- }
+ while (availablePort <= MAX_PORT_RANGE) {
+ if (isPortAvailable(availablePort)) {
+ LOGGER.debug("Port {} is available ", availablePort);
+ return availablePort;
}
+ LOGGER.debug("Port {} is not available", availablePort);
+ availablePort++;
+ }
+ throw new RuntimeException("could not find free available");
+ }
+
+ /**
+ * Check if port is available or not
+ *
+ * @param port
+ * @return true if port is available
+ */
+ public static boolean isPortAvailable(final int port) {
+ try (final Socket socket = new Socket("localhost", port)) {
+ return false;
+ } catch (final IOException ignoredException) {
+ LOGGER.trace("Port {} is available", port, ignoredException);
+ return true;
}
}
@@ -136,8 +128,9 @@ public final class MessagingUtils {
}
/**
- * This method searches the availability of the port, if the requested port not available,this method will increment
- * the port number and check the availability, this process will continue until it find port available.
+ * This method searches the availability of the port, if the requested port not available,this
+ * method will increment the port number and check the availability, this process will continue
+ * until it find port available.
*
* @param port the first port to check
* @return the port that was found
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java
index 56b903f38..b5663e452 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java
@@ -42,6 +42,8 @@ public final class ThreadUtilities {
try {
Thread.sleep(milliseconds);
} catch (final InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
return false;
}