diff options
Diffstat (limited to 'core')
11 files changed, 142 insertions, 133 deletions
diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java index c2a19a167..5fc7dc8c6 100644 --- a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java +++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/DeploymentClient.java @@ -20,8 +20,6 @@ package org.onap.policy.apex.core.deployment; -import com.google.common.eventbus.Subscribe; - import java.net.URI; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -37,10 +35,13 @@ import org.onap.policy.apex.core.protocols.Message; import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; +import com.google.common.eventbus.Subscribe; + /** - * The Class DeploymentClient handles the client side of an EngDep communication session with an Apex server. It runs a - * thread to handle message sending and session monitoring. It uses a sending queue to queue messages for sending by the - * client thread and a receiving queue to queue messages received from the Apex engine. + * The Class DeploymentClient handles the client side of an EngDep communication session with an + * Apex server. It runs a thread to handle message sending and session monitoring. It uses a sending + * queue to queue messages for sending by the client thread and a receiving queue to queue messages + * received from the Apex engine. * * @author Liam Fallon (liam.fallon@ericsson.com) */ @@ -90,7 +91,8 @@ public class DeploymentClient implements Runnable { thisThread.setName(DeploymentClient.class.getName() + "-" + host + ":" + port); try { - // Establish a connection to the Apex server for EngDep message communication over Web Sockets + // Establish a connection to the Apex server for EngDep message communication over Web + // Sockets service = factory.createClient(new URI("ws://" + host + ":" + port)); service.addMessageListener(new DeploymentClientListener()); @@ -101,7 +103,6 @@ public class DeploymentClient implements Runnable { LOGGER.error("engine<-->deployment client thread exception", e); return; } - // Loop forever, sending messages as they appear on the queue while (true) { try { @@ -110,6 +111,8 @@ public class DeploymentClient implements Runnable { } catch (final InterruptedException e) { // Message sending has been interrupted, we are finished LOGGER.debug("engine<-->deployment client interrupted"); + // restore the interrupt status + thisThread.interrupt(); break; } } @@ -169,10 +172,11 @@ public class DeploymentClient implements Runnable { } /** - * The listener interface for receiving deploymentClient events. The class that is interested in processing a - * deploymentClient event implements this interface, and the object created with that class is registered with a - * component using the component's {@code addDeploymentClientListener} method. When the deploymentClient event - * occurs, that object's appropriate method is invoked. + * The listener interface for receiving deploymentClient events. The class that is interested in + * processing a deploymentClient event implements this interface, and the object created with + * that class is registered with a component using the component's + * {@code addDeploymentClientListener} method. When the deploymentClient event occurs, that + * object's appropriate method is invoked. * * @see DeploymentClientEvent */ @@ -180,8 +184,9 @@ public class DeploymentClient implements Runnable { /* * (non-Javadoc) * - * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap.policy.apex.core. - * infrastructure.messaging.impl.ws.messageblock. MessageBlock) + * @see + * org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap. + * policy.apex.core. infrastructure.messaging.impl.ws.messageblock. MessageBlock) */ @Subscribe @Override @@ -192,7 +197,9 @@ public class DeploymentClient implements Runnable { /* * (non-Javadoc) * - * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.String) + * @see + * org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang. + * String) */ @Override public void onMessage(final String messageString) { diff --git a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/EngineServiceFacade.java b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/EngineServiceFacade.java index d954feaa3..11e870c9c 100644 --- a/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/EngineServiceFacade.java +++ b/core/core-deployment/src/main/java/org/onap/policy/apex/core/deployment/EngineServiceFacade.java @@ -50,11 +50,12 @@ import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; /** - * The Class Deployer deploys an Apex model held as an XML file onto an Apex engine. It uses the EngDep protocol to - * communicate with the engine, with the EngDep protocol being carried on Java web sockets. + * The Class Deployer deploys an Apex model held as an XML file onto an Apex engine. It uses the + * EngDep protocol to communicate with the engine, with the EngDep protocol being carried on Java + * web sockets. * - * This deployer is a simple command line deployer that reads the communication parameters and the location of the XML - * model file as arguments. + * This deployer is a simple command line deployer that reads the communication parameters and the + * location of the XML model file as arguments. * * @author Liam Fallon (liam.fallon@ericsson.com) */ @@ -62,7 +63,8 @@ public class EngineServiceFacade { // Get a reference to the logger private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceFacade.class); - // The default message timeout and timeout increment (the amount of time between polls) in milliseconds + // The default message timeout and timeout increment (the amount of time between polls) in + // milliseconds private static final int CLIENT_START_WAIT_INTERVAL = 100; private static final int REPLY_MESSAGE_TIMEOUT_DEFAULT = 10000; private static final int REPLY_MESSAGE_TIMEOUT_INCREMENT = 100; @@ -100,7 +102,8 @@ public class EngineServiceFacade { try { LOGGER.debug("handshaking with server {}:{} . . .", hostName, port); - // Use the deployment client to handle the EngDep communication towards the Apex server. It runs a thread to + // Use the deployment client to handle the EngDep communication towards the Apex server. + // It runs a thread to // monitor the session and to send // messages client = new DeploymentClient(hostName, port); @@ -184,7 +187,8 @@ public class EngineServiceFacade { * * @param modelFileName the name of the model file containing the model to deploy * @param ignoreConflicts true if conflicts between context in polices is to be ignored - * @param force true if the model is to be applied even if it is incompatible with the existing model + * @param force true if the model is to be applied even if it is incompatible with the existing + * model * @throws ApexException on Apex errors * @throws IOException on IO exceptions from the operating system */ @@ -215,7 +219,8 @@ public class EngineServiceFacade { * @param modelFileName the name of the model file containing the model to deploy * @param modelInputStream the stream that holds the Apex model * @param ignoreConflicts true if conflicts between context in polices is to be ignored - * @param force true if the model is to be applied even if it is incompatible with the existing model + * @param force true if the model is to be applied even if it is incompatible with the existing + * model * @throws ApexException on model deployment errors */ public void deployModel(final String modelFileName, final InputStream modelInputStream, @@ -238,7 +243,8 @@ public class EngineServiceFacade { * * @param apexPolicyModel the name of the model to deploy * @param ignoreConflicts true if conflicts between context in polices is to be ignored - * @param force true if the model is to be applied even if it is incompatible with the existing model + * @param force true if the model is to be applied even if it is incompatible with the existing + * model * @throws ApexException on model deployment errors */ public void deployModel(final AxPolicyModel apexPolicyModel, final boolean ignoreConflicts, final boolean force) @@ -436,6 +442,8 @@ public class EngineServiceFacade { try { receivedMessage = client.getReceiveQueue().poll(REPLY_MESSAGE_TIMEOUT_INCREMENT, TimeUnit.MILLISECONDS); } catch (final InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); LOGGER.warn("reception of response from server interrupted {}:{}", hostName, port, e); throw new ApexDeploymentException( "reception of response from server interrupted " + hostName + ':' + port, e); diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java index 2274b7c23..aaec71b47 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/StateOutput.java @@ -34,7 +34,8 @@ import org.onap.policy.apex.model.policymodel.concepts.AxStateOutput; import org.onap.policy.apex.model.utilities.Assertions; /** - * This class is the output of a state, and is used by the engine to decide what the next state for execution is. + * This class is the output of a state, and is used by the engine to decide what the next state for + * execution is. * * @author Liam Fallon (liam.fallon@ericsson.com) */ @@ -115,15 +116,11 @@ public class StateOutput { for (final Entry<String, Object> incomingFieldEntry : eventFieldMap.entrySet()) { final String fieldName = incomingFieldEntry.getKey(); final AxField fieldDef = incomingFieldDefinitionMap.get(fieldName); - try { - - // Check if this field is a field in the event - if (!outputEventDef.getFields().contains(fieldDef)) { - throw new StateMachineException( - "field \"" + fieldName + "\" does not exist on event \"" + outputEventDef.getID() + "\""); - } - } catch (final Exception e) { - e.printStackTrace(); + + // Check if this field is a field in the event + if (!outputEventDef.getFields().contains(fieldDef)) { + throw new StateMachineException( + "field \"" + fieldName + "\" does not exist on event \"" + outputEventDef.getID() + "\""); } // Set the value in the output event @@ -132,8 +129,8 @@ public class StateOutput { } /** - * This method copies any fields that exist on the input event that also exist on the output event if they are not - * set on the output event. + * This method copies any fields that exist on the input event that also exist on the output + * event if they are not set on the output event. * * @param incomingEvent The incoming event to copy from */ diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxStateFacade.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxStateFacade.java index 226f06ade..80d7e651b 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxStateFacade.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxStateFacade.java @@ -40,7 +40,7 @@ public class AxStateFacade { // CHECKSTYLE:OFF: checkstyle:visibilityModifier Logic has access to this field /** The full definition information for the state. */ - public AxState state; + public final AxState state; // CHECKSTYLE:ON: checkstyle:visibilityModifier diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java index 015f3ae80..85bf96c8d 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/AxTaskFacade.java @@ -46,7 +46,7 @@ public class AxTaskFacade { * The full definition of the task we are presenting a facade to, executing logic has full access to the task * definition. */ - public AxTask task; + public final AxTask task; // CHECKSTYLE:ON: checkstyle:visibilityModifier diff --git a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/StateFinalizerExecutionContext.java b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/StateFinalizerExecutionContext.java index 2e5971142..874d4def1 100644 --- a/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/StateFinalizerExecutionContext.java +++ b/core/core-engine/src/main/java/org/onap/policy/apex/core/engine/executor/context/StateFinalizerExecutionContext.java @@ -70,7 +70,7 @@ public class StateFinalizerExecutionContext { * fields to determine what state output to select. Once a state finalizer has selected a state output, it must * marshal these fields so that they match the fields required for the event defined in the state output. */ - public Map<String, Object> fields; + public final Map<String, Object> fields; // A message specified in the logic private String message; diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java index 534bee8af..7e9a31a4f 100644 --- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java @@ -20,8 +20,6 @@ package org.onap.policy.apex.core.infrastructure.messaging.impl.ws; -import com.google.common.eventbus.Subscribe; - import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.ObjectInputStream; @@ -40,9 +38,11 @@ import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; +import com.google.common.eventbus.Subscribe; + /** - * The Class RawMessageHandler handles raw messages being received on a Java web socket and forwards the messages to the - * DataHandler instance that has subscribed to the RawMessageHandler instance. + * The Class RawMessageHandler handles raw messages being received on a Java web socket and forwards + * the messages to the DataHandler instance that has subscribed to the RawMessageHandler instance. * * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) * @param <MESSAGE> the generic type of message being received @@ -85,33 +85,35 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS return; } - // Read the messages from the web socket and place them on the message queue for handling by the queue + // Read the messages from the web socket and place them on the message queue for handling by + // the queue // processing thread - ObjectInputStream ois = null; - try { - ois = new ObjectInputStream(new ByteArrayInputStream(dataByteBuffer.array())); + + try (final ByteArrayInputStream stream = new ByteArrayInputStream(dataByteBuffer.array()); + final ObjectInputStream ois = new ObjectInputStream(stream);) { @SuppressWarnings("unchecked") final MessageHolder<MESSAGE> messageHolder = (MessageHolder<MESSAGE>) ois.readObject(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("message {} recieved from the client {} ", messageHolder.toString(), + LOGGER.debug("message {} recieved from the client {} ", messageHolder, messageHolder == null ? "Apex Engine " : messageHolder.getSenderHostAddress()); } - final List<MESSAGE> messages = messageHolder.getMessages(); - if (messages != null) { - messageBlockQueue.add(new MessageBlock<MESSAGE>(messages, incomingData.getConn())); + if (messageHolder != null) { + final List<MESSAGE> messages = messageHolder.getMessages(); + if (messages != null) { + messageBlockQueue.add(new MessageBlock<MESSAGE>(messages, incomingData.getConn())); + } } - } catch (IOException | ClassNotFoundException e) { + } catch (final IOException | ClassNotFoundException e) { LOGGER.error("Failed to process message received"); LOGGER.catching(e); - } finally { - closeObjectStream(ois); } } /** - * This method is called when a string message is received on a web socket and is to be forwarded to a listener. + * This method is called when a string message is received on a web socket and is to be + * forwarded to a listener. * * @param messageString the message string */ @@ -128,21 +130,6 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS } /** - * Close the {@link ObjectInputStream} stream. - * - * @param ois is an instance of {@link ObjectInputStream} - */ - private void closeObjectStream(final ObjectInputStream ois) { - if (ois != null) { - try { - ois.close(); - } catch (final IOException e) { - LOGGER.catching(e); - } - } - } - - /** * This thread monitors the message queue and processes messages as they appear on the queue. * * @see java.lang.Runnable#run() @@ -161,6 +148,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS dataHandler.post(messageBlock); } } catch (final InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); LOGGER.debug("raw message listening has been interrupted"); break; } @@ -172,6 +161,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS dataHandler.post(stringMessage); } } catch (final InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); LOGGER.debug("raw message listening has been interrupted"); break; } @@ -180,6 +171,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS try { Thread.sleep(QUEUE_POLL_TIMEOUT); } catch (final InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); LOGGER.debug("raw message listening has been interrupted"); break; } @@ -206,7 +199,8 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS } /** - * This method is called when a message is received on a web socket and is to be forwarded to a listener. + * This method is called when a message is received on a web socket and is to be forwarded to a + * listener. * * @param data the message data containing a message */ @@ -246,8 +240,5 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS if (listener == null) { throw new IllegalArgumentException("The listener object cannot be null"); } - if (dataHandler == null) { - throw new IllegalStateException("Data handler not initialized"); - } } } diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java index 4a756d6f0..36ad3b163 100644 --- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java @@ -29,8 +29,8 @@ import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; /** - * The Class MessagingClient is the class that wraps web socket handling, message sending, and message reception on the - * client side of a web socket in Apex. + * The Class MessagingClient is the class that wraps web socket handling, message sending, and + * message reception on the client side of a web socket in Apex. * * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) * @param <MESSAGE> the generic type @@ -39,12 +39,13 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE> // The length of time to wait for a connection to a web socket server before aborting private static final int CONNECTION_TIMEOUT_TIME_MS = 3000; - // The length of time to wait before checking if a connection to a web socket server has worked or not + // The length of time to wait before checking if a connection to a web socket server has worked + // or not private static final int CONNECTION_TRY_INTERVAL_MS = 100; /** - * Constructor of this class, uses its {@link InternalMessageBusClient} superclass to set up the web socket and - * handle incoming message forwarding. + * Constructor of this class, uses its {@link InternalMessageBusClient} superclass to set up the + * web socket and handle incoming message forwarding. * * @param serverUri The URI of the service */ @@ -80,7 +81,11 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE> public void startConnection() { // Open the web socket final WebSocket connection = super.getConnection(); - if (connection != null && !connection.isOpen()) { + + if (connection == null) { + throw new IllegalStateException("Could not connect to the server"); + } + if (!connection.isOpen()) { connect(); } @@ -129,8 +134,9 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE> /* * (non-Javadoc) * - * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex.core. - * infrastructure. messaging.MessageHolder) + * @see + * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex + * .core. infrastructure. messaging.MessageHolder) */ @Override public void send(final MessageHolder<MESSAGE> commands) { @@ -142,7 +148,8 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE> /* * (non-Javadoc) * - * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String) + * @see + * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String) */ @Override public void send(final String messageString) { diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java index fc401576f..ee6e1a329 100644 --- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java @@ -98,6 +98,8 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE } catch (final IOException ioe) { LOGGER.catching(ioe); } catch (final InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); // This can happen in normal operation so ignore } isStarted = false; @@ -119,8 +121,9 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE /* * (non-Javadoc) * - * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex.core. - * infrastructure. messaging.MessageHolder) + * @see + * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex + * .core. infrastructure. messaging.MessageHolder) */ @Override public void send(final MessageHolder<MESSAGE> message) { @@ -134,7 +137,8 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE /* * (non-Javadoc) * - * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String) + * @see + * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String) */ @Override public void send(final String messageString) { diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java index d15f86c8a..44b3c7fab 100644 --- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java @@ -33,8 +33,8 @@ import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; /** - * The Class MessagingUtils is a class with static methods used in IPC messaging for finding free ports, translating - * host names to addresses, serializing objects and flushing object streams. + * The Class MessagingUtils is a class with static methods used in IPC messaging for finding free + * ports, translating host names to addresses, serializing objects and flushing object streams. * * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) */ @@ -42,6 +42,11 @@ public final class MessagingUtils { // The port number of the lowest user port, ports 0-1023 are system ports private static final int LOWEST_USER_PORT = 1024; + /** + * Port number is an unsigned 16-bit integer, so maximum port is 65535 + */ + private static final int MAX_PORT_RANGE = 65535; + // Logger for this class private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessagingUtils.class); @@ -51,8 +56,8 @@ public final class MessagingUtils { private MessagingUtils() {} /** - * This method searches the availability of the port, if the requested port not available, this method will throw an - * exception. + * This method searches the availability of the port, if the requested port not available, this + * method will throw an exception. * * @param port the port to check * @return the port verified as being free @@ -61,33 +66,18 @@ public final class MessagingUtils { public static int checkPort(final int port) { LOGGER.entry("Checking availability of port {}", port); - Socket s = null; - try { - // Try to connect to the port, if we can connect then the port is occupied - s = new Socket("localhost", port); - LOGGER.debug("Port {} is not available", port); - - throw new RuntimeException("could not allocate requested port: " + port); - } catch (final IOException e) { - // We found a free port + if (isPortAvailable(port)) { LOGGER.debug("Port {} is available ", port); return port; - } finally { - // Close the socket used to check if the port was free - if (s != null) { - try { - s.close(); - } catch (final IOException e) { - LOGGER.catching(e); - LOGGER.warn("could not allocate requested port " + port, e); - } - } } + LOGGER.debug("Port {} is not available", port); + throw new RuntimeException("could not allocate requested port: " + port); } /** - * This method searches the availability of the port, if the requested port not available,this method will increment - * the port number and check the availability of that port, this process will continue until it find port available. + * This method searches the availability of the port, if the requested port not available,this + * method will increment the port number and check the availability of that port, this process + * will continue until it reaches max port range which is {@link MAX_PORT_RANGE}. * * @param port the first port to check * @return the port that was found @@ -96,29 +86,31 @@ public final class MessagingUtils { public static int findPort(final int port) { LOGGER.entry("Checking availability of port {}", port); - Socket s = null; - try { - // Try to connect to the port, if we can connect then the port is occupied - s = new Socket("localhost", port); - LOGGER.debug("Port {} is not available", port); + int availablePort = port; - // Recurse and try the next port - return findPort(port + 1); - } catch (final IOException e) { - // We found a free port - LOGGER.debug("Port {} is available ", port); - return port; - } finally { - // Close the socket used to check if the port was free - if (s != null) { - try { - s.close(); - } catch (final IOException e) { - LOGGER.catching(e); - LOGGER.warn("could not allocate requested port " + port, e); - throw new RuntimeException("could not allocate requested port " + port, e); - } + while (availablePort <= MAX_PORT_RANGE) { + if (isPortAvailable(availablePort)) { + LOGGER.debug("Port {} is available ", availablePort); + return availablePort; } + LOGGER.debug("Port {} is not available", availablePort); + availablePort++; + } + throw new RuntimeException("could not find free available"); + } + + /** + * Check if port is available or not + * + * @param port + * @return true if port is available + */ + public static boolean isPortAvailable(final int port) { + try (final Socket socket = new Socket("localhost", port)) { + return false; + } catch (final IOException ignoredException) { + LOGGER.trace("Port {} is available", port, ignoredException); + return true; } } @@ -136,8 +128,9 @@ public final class MessagingUtils { } /** - * This method searches the availability of the port, if the requested port not available,this method will increment - * the port number and check the availability, this process will continue until it find port available. + * This method searches the availability of the port, if the requested port not available,this + * method will increment the port number and check the availability, this process will continue + * until it find port available. * * @param port the first port to check * @return the port that was found diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java index 56b903f38..b5663e452 100644 --- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java @@ -42,6 +42,8 @@ public final class ThreadUtilities { try { Thread.sleep(milliseconds); } catch (final InterruptedException e) { + // restore the interrupt status + Thread.currentThread().interrupt(); return false; } |