summaryrefslogtreecommitdiffstats
path: root/core/core-infrastructure/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/core-infrastructure/src')
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/JavaHandlingException.java12
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/ClassUtils.java41
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/compile/singleclass/SingleClassBuilder.java24
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/compile/singleclass/SingleClassByteCodeFileObject.java2
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/compile/singleclass/SingleFileManager.java2
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageHolder.java15
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageListener.java6
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingService.java10
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingServiceFactory.java8
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java67
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/WebSocketMessageListener.java8
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/InternalMessageBusClient.java16
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java12
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlock.java10
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlockHandler.java12
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/InternalMessageBusServer.java36
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java31
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WsStringMessageClient.java (renamed from core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageClient.java)34
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WsStringMessageListener.java (renamed from core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageListener.java)2
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WsStringMessageServer.java (renamed from core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageServer.java)18
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WsStringMessager.java (renamed from core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessager.java)4
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java11
-rw-r--r--core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/EndToEndStringMessagingTest.java22
-rw-r--r--core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/StringTestServer.java45
24 files changed, 239 insertions, 209 deletions
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/JavaHandlingException.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/JavaHandlingException.java
index f6ef68105..63bd1c477 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/JavaHandlingException.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/JavaHandlingException.java
@@ -40,19 +40,19 @@ public class JavaHandlingException extends Exception {
/**
* Instantiates a new Java handling exception.
*
- * @param e the exception to wrap
+ * @param exception the exception to wrap
*/
- public JavaHandlingException(final Exception e) {
- super(e);
+ public JavaHandlingException(final Exception exception) {
+ super(exception);
}
/**
* Instantiates a new Java handling exception.
*
* @param message the message
- * @param e the exception to wrap
+ * @param exception the exception to wrap
*/
- public JavaHandlingException(final String message, final Exception e) {
- super(message, e);
+ public JavaHandlingException(final String message, final Exception exception) {
+ super(message, exception);
}
}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/ClassUtils.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/ClassUtils.java
index 919d1b122..16a3369fb 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/ClassUtils.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/ClassUtils.java
@@ -22,6 +22,7 @@ package org.onap.policy.apex.core.infrastructure.java.classes;
import java.io.File;
import java.io.FileInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.net.URL;
@@ -44,6 +45,9 @@ public abstract class ClassUtils {
// Get a reference to the logger
private static final XLogger LOGGER = XLoggerFactory.getXLogger(ClassUtils.class);
+ // Repeated string constants
+ private static final String CLASS_PATTERN = "\\.class$";
+
// The boot directory in Java for predefined JARs
private static final String SUN_BOOT_LIBRARY_PATH = "sun.boot.library.path";
@@ -89,16 +93,13 @@ public abstract class ClassUtils {
try {
final Class<?> nullclassloader = Class.forName("sun.misc.Launcher");
if (nullclassloader != null) {
- // There a long way and a short way, Short way: causes a warning that cannot be suppressed
- // URL[] moreurls = sun.misc.Launcher.getBootstrapClassPath().getURLs();
- // long way:
- Method m = nullclassloader.getMethod("getBootstrapClassPath");
- if (m != null) {
- final Object cp = m.invoke(null, (Object[]) null);
+ Method mmethod = nullclassloader.getMethod("getBootstrapClassPath");
+ if (mmethod != null) {
+ final Object cp = mmethod.invoke(null, (Object[]) null);
if (cp != null) {
- m = cp.getClass().getMethod("getURLs");
- if (m != null) {
- final URL[] moreurls = (URL[]) (m.invoke(cp, (Object[]) null));
+ mmethod = cp.getClass().getMethod("getURLs");
+ if (mmethod != null) {
+ final URL[] moreurls = (URL[]) (mmethod.invoke(cp, (Object[]) null));
if (moreurls != null && moreurls.length > 0) {
if (urls.length == 0) {
urls = moreurls;
@@ -130,10 +131,8 @@ public abstract class ClassUtils {
// JARs are processed as well
else if (url.getFile().endsWith(".jar")) {
classNameSet.addAll(processJar(urlFile));
- } else {
- // It's a resource or some other non-executable thing
- continue;
}
+ // It's a resource or some other non-executable thing
}
} catch (final Exception e) {
LOGGER.warn("could not get the names of Java classes", e);
@@ -168,9 +167,7 @@ public abstract class ClassUtils {
} else if (child.getName().endsWith(".class") && !child.getName().contains("$")) {
// Process the ".class" file
classNameSet.add(
- child.getAbsolutePath().replace(rootDir, "").replaceFirst("\\.class$", "").replace('/', '.'));
- } else {
- continue;
+ child.getAbsolutePath().replace(rootDir, "").replaceFirst(CLASS_PATTERN, "").replace('/', '.'));
}
}
return classNameSet;
@@ -194,7 +191,7 @@ public abstract class ClassUtils {
fileName = fileName.substring(classesPos + CLASSES_TOKEN.length());
}
- return fileName.replaceFirst("\\.class$", "").replace('/', '.');
+ return fileName.replaceFirst(CLASS_PATTERN, "").replace('/', '.');
}
/**
@@ -202,9 +199,9 @@ public abstract class ClassUtils {
*
* @param jarFile the JAR file
* @return a set of class names
- * @throws Exception on errors processing JARs
+ * @throws IOException on errors processing JARs
*/
- public static Set<String> processJar(final File jarFile) throws Exception {
+ public static Set<String> processJar(final File jarFile) throws IOException {
// Pass the file as an input stream
return processJar(new FileInputStream(jarFile.getAbsolutePath()));
}
@@ -214,9 +211,9 @@ public abstract class ClassUtils {
*
* @param jarInputStream the JAR input stream
* @return a set of class names
- * @throws Exception on errors processing JARs
+ * @throws IOException on errors processing JARs
*/
- public static Set<String> processJar(final InputStream jarInputStream) throws Exception {
+ public static Set<String> processJar(final InputStream jarInputStream) throws IOException {
// The return set
final TreeSet<String> classPathSet = new TreeSet<>();
@@ -229,7 +226,7 @@ public abstract class ClassUtils {
// Iterate over each entry in the JAR
for (ZipEntry entry = zip.getNextEntry(); entry != null; entry = zip.getNextEntry()) {
if (!entry.isDirectory() && entry.getName().endsWith(".class") && !entry.getName().contains("$")) {
- classPathSet.add(entry.getName().replaceFirst("\\.class$", "").replace('/', '.'));
+ classPathSet.add(entry.getName().replaceFirst(CLASS_PATTERN, "").replace('/', '.'));
}
}
zip.close();
@@ -243,7 +240,7 @@ public abstract class ClassUtils {
*/
public static void main(final String[] args) {
for (final String clz : getClassNames()) {
- System.out.println("Found class: " + clz);
+ LOGGER.info("Found class: {}", clz);
}
}
}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/compile/singleclass/SingleClassBuilder.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/compile/singleclass/SingleClassBuilder.java
index 7a0b6048e..464a601e1 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/compile/singleclass/SingleClassBuilder.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/compile/singleclass/SingleClassBuilder.java
@@ -69,8 +69,8 @@ public class SingleClassBuilder {
*/
public void compile() throws JavaHandlingException {
// Get the list of compilation units, there is only one here
- final List<? extends JavaFileObject> compilationUnits =
- Arrays.asList(new SingleClassCompilationUnit(className, sourceCode));
+ final List<? extends JavaFileObject> compilationUnits = Arrays
+ .asList(new SingleClassCompilationUnit(className, sourceCode));
// Allows us to get diagnostics from the compilation
final DiagnosticCollector<JavaFileObject> diagnosticListener = new DiagnosticCollector<>();
@@ -80,8 +80,8 @@ public class SingleClassBuilder {
// Set up the target file manager and call the compiler
singleFileManager = new SingleFileManager(compiler, new SingleClassByteCodeFileObject(className));
- final JavaCompiler.CompilationTask task =
- compiler.getTask(null, singleFileManager, diagnosticListener, null, null, compilationUnits);
+ final JavaCompiler.CompilationTask task = compiler.getTask(null, singleFileManager, diagnosticListener, null,
+ null, compilationUnits);
// Check if the compilation worked
if (!task.call()) {
@@ -104,9 +104,9 @@ public class SingleClassBuilder {
builder.append("\n");
}
- LOGGER.warn("error compiling Java code for class \"" + className + "\": " + builder.toString());
- throw new JavaHandlingException(
- "error compiling Java code for class \"" + className + "\": " + builder.toString());
+ String message = "error compiling Java code for class \"" + className + "\": " + builder.toString();
+ LOGGER.warn(message);
+ throw new JavaHandlingException(message);
}
}
@@ -120,12 +120,12 @@ public class SingleClassBuilder {
* @throws ClassNotFoundException the byte code for the class is not found in the class loader
* @throws JavaHandlingException the java handling exception if the Java class source code is not compiled
*/
- public Object createObject()
- throws InstantiationException, IllegalAccessException, ClassNotFoundException, JavaHandlingException {
+ public Object createObject() throws InstantiationException, IllegalAccessException, ClassNotFoundException,
+ JavaHandlingException {
if (singleFileManager == null) {
- LOGGER.warn("error instantiating instance for class \"" + className + "\": code may not be compiled");
- throw new JavaHandlingException(
- "error instantiating instance for class \"" + className + "\": code may not be compiled");
+ String message = "error instantiating instance for class \"" + className + "\": code may not be compiled";
+ LOGGER.warn(message);
+ throw new JavaHandlingException(message);
}
return singleFileManager.getClassLoader(null).findClass(className).newInstance();
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/compile/singleclass/SingleClassByteCodeFileObject.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/compile/singleclass/SingleClassByteCodeFileObject.java
index 4b7225267..043657854 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/compile/singleclass/SingleClassByteCodeFileObject.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/compile/singleclass/SingleClassByteCodeFileObject.java
@@ -35,7 +35,7 @@ import javax.tools.SimpleJavaFileObject;
* basis for {@code JavaFileObject} implementations. Subclasses can override the implementation and specification of any
* method of this class as long as the general contract of {@code JavaFileObject} is obeyed.
*
- * This class holds the byte code for a single class in memory.
+ * <p>This class holds the byte code for a single class in memory.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/compile/singleclass/SingleFileManager.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/compile/singleclass/SingleFileManager.java
index cd14b1a06..066765504 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/compile/singleclass/SingleFileManager.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/compile/singleclass/SingleFileManager.java
@@ -34,7 +34,7 @@ import javax.tools.StandardJavaFileManager;
* is an implementation of {@code JavaFileManager} that forwards the {@code JavaFileManager} methods to a given file
* manager.
*
- * This class instantiates and forwards those requests to a {@link StandardJavaFileManager} instance to act as a
+ * <p>This class instantiates and forwards those requests to a {@link StandardJavaFileManager} instance to act as a
* {@code JavaFileManager} for a Java single file, managing class loading for the class.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageHolder.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageHolder.java
index f74ffa0b3..243e057be 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageHolder.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageHolder.java
@@ -33,9 +33,9 @@ import org.slf4j.ext.XLoggerFactory;
* implementation.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
- * @param <MESSAGE> the generic type of message being handled by a message holder instance
+ * @param <M> the generic type of message being handled by a message holder instance
*/
-public class MessageHolder<MESSAGE> implements Serializable {
+public class MessageHolder<M> implements Serializable {
private static final int HASH_PRIME = 31;
private static final int FOUR_BYTES = 32;
@@ -50,7 +50,7 @@ public class MessageHolder<MESSAGE> implements Serializable {
private final InetAddress senderHostAddress;
// Sequence of message in the message holder
- private final List<MESSAGE> messages;
+ private final List<M> messages;
/**
* Constructor, create the message holder.
@@ -69,7 +69,7 @@ public class MessageHolder<MESSAGE> implements Serializable {
*
* @return the messages
*/
- public List<MESSAGE> getMessages() {
+ public List<M> getMessages() {
return messages;
}
@@ -78,7 +78,7 @@ public class MessageHolder<MESSAGE> implements Serializable {
*
* @param message the message to add
*/
- public void addMessage(final MESSAGE message) {
+ public void addMessage(final M message) {
if (!messages.contains(message)) {
messages.add(message);
} else {
@@ -160,9 +160,6 @@ public class MessageHolder<MESSAGE> implements Serializable {
} else if (!messages.equals(other.messages)) {
return false;
}
- if (creationTime != other.creationTime) {
- return false;
- }
- return true;
+ return creationTime == other.creationTime;
}
}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageListener.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageListener.java
index c8b132423..0aab650d3 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageListener.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageListener.java
@@ -27,16 +27,16 @@ import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.M
* implements this interface.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
- * @param <MESSAGE> of message of any given type that is being listened for and handled
+ * @param <M> of message of any given type that is being listened for and handled
*/
-public interface MessageListener<MESSAGE> {
+public interface MessageListener<M> {
/**
* This method is called when a message block is received on a web socket and is to be forwarded to a listener.
*
* @param data the message data containing a message
*/
- void onMessage(MessageBlock<MESSAGE> data);
+ void onMessage(MessageBlock<M> data);
/**
* This method is called when a string message is received on a web socket and is to be forwarded to a listener.
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingService.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingService.java
index 7e91b95ea..352e70806 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingService.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingService.java
@@ -25,9 +25,9 @@ package org.onap.policy.apex.core.infrastructure.messaging;
* messaging.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
- * @param <MESSAGE> the type of message being passed by an implementation of Apex messaging
+ * @param <M> the type of message being passed by an implementation of Apex messaging
*/
-public interface MessagingService<MESSAGE> {
+public interface MessagingService<M> {
/**
* Start the messaging connection.
@@ -51,7 +51,7 @@ public interface MessagingService<MESSAGE> {
*
* @param messageHolder The message holder holding the messages to be sent
*/
- void send(MessageHolder<MESSAGE> messageHolder);
+ void send(MessageHolder<M> messageHolder);
/**
* Send a string message on the connection.
@@ -65,12 +65,12 @@ public interface MessagingService<MESSAGE> {
*
* @param messageListener the message listener
*/
- void addMessageListener(MessageListener<MESSAGE> messageListener);
+ void addMessageListener(MessageListener<M> messageListener);
/**
* Removes the message listener.
*
* @param messageListener the message listener
*/
- void removeMessageListener(MessageListener<MESSAGE> messageListener);
+ void removeMessageListener(MessageListener<M> messageListener);
}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingServiceFactory.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingServiceFactory.java
index 1d08fac74..b38b32f0e 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingServiceFactory.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingServiceFactory.java
@@ -30,9 +30,9 @@ import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.server.Message
* A factory class to create a "server" or "client" type Messaging Service.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
- * @param <MESSAGE> the generic type of message to be handled by this messaging service
+ * @param <M> the generic type of message to be handled by this messaging service
*/
-public class MessagingServiceFactory<MESSAGE> {
+public class MessagingServiceFactory<M> {
/**
* Create a web socket server instance and returns to the caller.
@@ -40,7 +40,7 @@ public class MessagingServiceFactory<MESSAGE> {
* @param address the address of the server machine
* @return the messaging service
*/
- public MessagingService<MESSAGE> createServer(final InetSocketAddress address) {
+ public MessagingService<M> createServer(final InetSocketAddress address) {
return new MessageServerImpl<>(address);
}
@@ -50,7 +50,7 @@ public class MessagingServiceFactory<MESSAGE> {
* @param uri the URI of the server to connect to
* @return an instance of {@link MessagingService}
*/
- public MessagingService<MESSAGE> createClient(final URI uri) {
+ public MessagingService<M> createClient(final URI uri) {
if (uri == null) {
throw new IllegalArgumentException("URI cannot be null");
}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java
index 7e9a31a4f..e0bf0ea6b 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java
@@ -20,6 +20,8 @@
package org.onap.policy.apex.core.infrastructure.messaging.impl.ws;
+import com.google.common.eventbus.Subscribe;
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -38,19 +40,20 @@ import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
-import com.google.common.eventbus.Subscribe;
-
/**
- * The Class RawMessageHandler handles raw messages being received on a Java web socket and forwards
- * the messages to the DataHandler instance that has subscribed to the RawMessageHandler instance.
+ * The Class RawMessageHandler handles raw messages being received on a Java web socket and forwards the messages to the
+ * DataHandler instance that has subscribed to the RawMessageHandler instance.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
- * @param <MESSAGE> the generic type of message being received
+ * @param <M> the generic type of message being received
*/
-public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESSAGE>, Runnable {
+public class RawMessageHandler<M> implements WebSocketMessageListener<M>, Runnable {
// The logger for this class
private static final XLogger LOGGER = XLoggerFactory.getXLogger(RawMessageHandler.class);
+ // Repeated string constants
+ private static final String RAW_MESSAGE_LISTENING_INTERRUPTED = "raw message listening has been interrupted";
+
// The amount of time to sleep during shutdown for the thread of this message handler to stop
private static final int SHUTDOWN_WAIT_TIME = 10;
@@ -58,13 +61,13 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
private static final long QUEUE_POLL_TIMEOUT = 50;
// A queue that temporarily holds message blocks
- private final BlockingQueue<MessageBlock<MESSAGE>> messageBlockQueue = new LinkedBlockingDeque<>();
+ private final BlockingQueue<MessageBlock<M>> messageBlockQueue = new LinkedBlockingDeque<>();
// A queue that temporarily holds message blocks
private final BlockingQueue<String> stringMessageQueue = new LinkedBlockingDeque<>();
// Client applications that have subscribed for messages
- private final MessageBlockHandler<MESSAGE> dataHandler = new MessageBlockHandler<MESSAGE>("data-processor");
+ private final MessageBlockHandler<M> dataHandler = new MessageBlockHandler<>("data-processor");
// The thread that the raw message handler is receiving messages on
private Thread thisThread = null;
@@ -90,19 +93,19 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
// processing thread
try (final ByteArrayInputStream stream = new ByteArrayInputStream(dataByteBuffer.array());
- final ObjectInputStream ois = new ObjectInputStream(stream);) {
+ final ObjectInputStream ois = new ObjectInputStream(stream);) {
@SuppressWarnings("unchecked")
- final MessageHolder<MESSAGE> messageHolder = (MessageHolder<MESSAGE>) ois.readObject();
+ final MessageHolder<M> messageHolder = (MessageHolder<M>) ois.readObject();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("message {} recieved from the client {} ", messageHolder,
- messageHolder == null ? "Apex Engine " : messageHolder.getSenderHostAddress());
+ messageHolder == null ? "Apex Engine " : messageHolder.getSenderHostAddress());
}
if (messageHolder != null) {
- final List<MESSAGE> messages = messageHolder.getMessages();
+ final List<M> messages = messageHolder.getMessages();
if (messages != null) {
- messageBlockQueue.add(new MessageBlock<MESSAGE>(messages, incomingData.getConn()));
+ messageBlockQueue.add(new MessageBlock<M>(messages, incomingData.getConn()));
}
}
} catch (final IOException | ClassNotFoundException e) {
@@ -112,8 +115,7 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
}
/**
- * This method is called when a string message is received on a web socket and is to be
- * forwarded to a listener.
+ * This method is called when a string message is received on a web socket and is to be forwarded to a listener.
*
* @param messageString the message string
*/
@@ -130,6 +132,16 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
}
/**
+ * This method is called when a message is received on a web socket and is to be forwarded to a listener.
+ *
+ * @param data the message data containing a message
+ */
+ @Override
+ public void onMessage(final MessageBlock<M> data) {
+ throw new UnsupportedOperationException("this operation is not supported");
+ }
+
+ /**
* This thread monitors the message queue and processes messages as they appear on the queue.
*
* @see java.lang.Runnable#run()
@@ -143,14 +155,14 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
while (thisThread.isAlive() && !thisThread.isInterrupted()) {
try {
// Read message block messages from the queue and pass it to the data handler
- MessageBlock<MESSAGE> messageBlock = null;
+ MessageBlock<M> messageBlock = null;
while ((messageBlock = messageBlockQueue.poll(1, TimeUnit.MILLISECONDS)) != null) {
dataHandler.post(messageBlock);
}
} catch (final InterruptedException e) {
// restore the interrupt status
Thread.currentThread().interrupt();
- LOGGER.debug("raw message listening has been interrupted");
+ LOGGER.debug(RAW_MESSAGE_LISTENING_INTERRUPTED);
break;
}
@@ -163,7 +175,7 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
} catch (final InterruptedException e) {
// restore the interrupt status
Thread.currentThread().interrupt();
- LOGGER.debug("raw message listening has been interrupted");
+ LOGGER.debug(RAW_MESSAGE_LISTENING_INTERRUPTED);
break;
}
@@ -173,7 +185,7 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
} catch (final InterruptedException e) {
// restore the interrupt status
Thread.currentThread().interrupt();
- LOGGER.debug("raw message listening has been interrupted");
+ LOGGER.debug(RAW_MESSAGE_LISTENING_INTERRUPTED);
break;
}
}
@@ -199,23 +211,12 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
}
/**
- * This method is called when a message is received on a web socket and is to be forwarded to a
- * listener.
- *
- * @param data the message data containing a message
- */
- @Override
- public void onMessage(final MessageBlock<MESSAGE> data) {
- throw new UnsupportedOperationException("this operation is not supported");
- }
-
- /**
* Register a data forwarder to which messages coming in on the web socket will be forwarded.
*
* @param listener The listener to register
*/
@Override
- public void registerDataForwarder(final MessageListener<MESSAGE> listener) {
+ public void registerDataForwarder(final MessageListener<M> listener) {
stateCheck(listener);
dataHandler.registerMessageHandler(listener);
}
@@ -226,7 +227,7 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
* @param listener The listener to unregister
*/
@Override
- public void unRegisterDataForwarder(final MessageListener<MESSAGE> listener) {
+ public void unRegisterDataForwarder(final MessageListener<M> listener) {
stateCheck(listener);
dataHandler.unRegisterMessageHandler(listener);
}
@@ -236,7 +237,7 @@ public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESS
*
* @param listener the listener to check
*/
- private void stateCheck(final MessageListener<MESSAGE> listener) {
+ private void stateCheck(final MessageListener<M> listener) {
if (listener == null) {
throw new IllegalArgumentException("The listener object cannot be null");
}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/WebSocketMessageListener.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/WebSocketMessageListener.java
index aa951b4ec..529e887e4 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/WebSocketMessageListener.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/WebSocketMessageListener.java
@@ -30,10 +30,10 @@ import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.R
* object's appropriate method is invoked.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
- * @param <MESSAGE> the generic type
+ * @param <M> the generic type
* @see RawMessageBlock
*/
-public interface WebSocketMessageListener<MESSAGE> extends MessageListener<MESSAGE>, Runnable {
+public interface WebSocketMessageListener<M> extends MessageListener<M>, Runnable {
/**
* This method is called by the class with which this message listener has been registered.
@@ -47,12 +47,12 @@ public interface WebSocketMessageListener<MESSAGE> extends MessageListener<MESSA
*
* @param listener The listener to register
*/
- void registerDataForwarder(MessageListener<MESSAGE> listener);
+ void registerDataForwarder(MessageListener<M> listener);
/**
* Unregister a data forwarder that was previously registered on the web socket listener.
*
* @param listener The listener to unregister
*/
- void unRegisterDataForwarder(MessageListener<MESSAGE> listener);
+ void unRegisterDataForwarder(MessageListener<M> listener);
}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/InternalMessageBusClient.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/InternalMessageBusClient.java
index 9f7f89d8c..17391fb89 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/InternalMessageBusClient.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/InternalMessageBusClient.java
@@ -36,9 +36,9 @@ import org.slf4j.ext.XLoggerFactory;
* receive messages on the web socket.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
- * @param <MESSAGE> the generic type of message being handled
+ * @param <M> the generic type of message being handled
*/
-abstract class InternalMessageBusClient<MESSAGE> extends WebSocketClientImpl {
+abstract class InternalMessageBusClient<M> extends WebSocketClientImpl {
private static final int THREAD_FACTORY_STACK_SIZE = 256;
// The logger for this class
@@ -48,15 +48,15 @@ abstract class InternalMessageBusClient<MESSAGE> extends WebSocketClientImpl {
private static final String RAW_EVENT_BUS = "Raw-Event-Bus";
// This instance handles the raw data received from the web socket
- private final RawMessageHandler<MESSAGE> rawMessageHandler = new RawMessageHandler<>();
+ private final RawMessageHandler<M> rawMessageHandler = new RawMessageHandler<>();
// The message block handler to which to pass messages coming in on this client
- private MessageBlockHandler<MESSAGE> messageBlockHandler = null;
+ private MessageBlockHandler<M> messageBlockHandler = null;
// The raw message handler uses a thread to process incoming events off a queue, this class owns and controls that
// thread. These fields hold the thread and
// the thread factory for creating threads.
- private ApplicationThreadFactory tFactory =
+ private ApplicationThreadFactory threadFactory =
new ApplicationThreadFactory("ws-client-thread", THREAD_FACTORY_STACK_SIZE);
private Thread forwarderThread = null;
@@ -75,7 +75,7 @@ abstract class InternalMessageBusClient<MESSAGE> extends WebSocketClientImpl {
messageBlockHandler.registerMessageHandler(rawMessageHandler);
// Create the thread that manages the queue in the data handler
- forwarderThread = tFactory.newThread(rawMessageHandler);
+ forwarderThread = threadFactory.newThread(rawMessageHandler);
forwarderThread.start();
LOGGER.exit();
@@ -109,7 +109,7 @@ abstract class InternalMessageBusClient<MESSAGE> extends WebSocketClientImpl {
*
* @param listener a simple class, that listens for the events from Event
*/
- public void addMessageListener(final MessageListener<MESSAGE> listener) {
+ public void addMessageListener(final MessageListener<M> listener) {
rawMessageHandler.registerDataForwarder(listener);
}
@@ -118,7 +118,7 @@ abstract class InternalMessageBusClient<MESSAGE> extends WebSocketClientImpl {
*
* @param listener the listener
*/
- public void removeMessageListener(final MessageListener<MESSAGE> listener) {
+ public void removeMessageListener(final MessageListener<M> listener) {
rawMessageHandler.unRegisterDataForwarder(listener);
}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java
index 36ad3b163..dd9aac122 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java
@@ -33,9 +33,9 @@ import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
* message reception on the client side of a web socket in Apex.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
- * @param <MESSAGE> the generic type
+ * @param <M> the generic type
*/
-public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE> implements MessagingService<MESSAGE> {
+public class MessagingClient<M> extends InternalMessageBusClient<M> implements MessagingService<M> {
// The length of time to wait for a connection to a web socket server before aborting
private static final int CONNECTION_TIMEOUT_TIME_MS = 3000;
@@ -102,7 +102,7 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE>
*/
private boolean waitforConnection(final WebSocket connection) {
// The total time we have before timeout
- int timeoutMSCounter = CONNECTION_TIMEOUT_TIME_MS;
+ int timeoutMsCounter = CONNECTION_TIMEOUT_TIME_MS;
// Check the connection state
do {
@@ -112,7 +112,7 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE>
case CLOSING:
// Not connected yet so wait for the try interval
ThreadUtilities.sleep(CONNECTION_TRY_INTERVAL_MS);
- timeoutMSCounter -= CONNECTION_TRY_INTERVAL_MS;
+ timeoutMsCounter -= CONNECTION_TRY_INTERVAL_MS;
break;
case OPEN:
// Connection is open, happy days
@@ -125,7 +125,7 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE>
}
}
// While the timeout value has not expired
- while (timeoutMSCounter > 0);
+ while (timeoutMsCounter > 0);
// We have timed out
return false;
@@ -139,7 +139,7 @@ public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE>
* .core. infrastructure. messaging.MessageHolder)
*/
@Override
- public void send(final MessageHolder<MESSAGE> commands) {
+ public void send(final MessageHolder<M> commands) {
// Get the connection and send the message
final WebSocket connection = super.getConnection();
connection.send(MessagingUtils.serializeObject(commands));
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlock.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlock.java
index 70b1d2c3a..1c6852686 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlock.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlock.java
@@ -28,12 +28,12 @@ import org.java_websocket.WebSocket;
* This class encapsulate messages and the web socket on which they are handled.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
- * @param <MESSAGE> the generic type of message being handled
+ * @param <M> the generic type of message being handled
*/
-public final class MessageBlock<MESSAGE> {
+public final class MessageBlock<M> {
// List of Messages received on a web socket
- private final List<MESSAGE> messages;
+ private final List<M> messages;
// The web socket on which the messages are handled
private final WebSocket webSocket;
@@ -44,7 +44,7 @@ public final class MessageBlock<MESSAGE> {
* @param messages the messages in the message block
* @param webSocket the web socket used to handle the message block
*/
- public MessageBlock(final List<MESSAGE> messages, final WebSocket webSocket) {
+ public MessageBlock(final List<M> messages, final WebSocket webSocket) {
this.messages = messages;
this.webSocket = webSocket;
}
@@ -54,7 +54,7 @@ public final class MessageBlock<MESSAGE> {
*
* @return the messages
*/
- public List<MESSAGE> getMessages() {
+ public List<M> getMessages() {
return messages;
}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlockHandler.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlockHandler.java
index 4265718db..123305b07 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlockHandler.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlockHandler.java
@@ -31,9 +31,9 @@ import org.slf4j.ext.XLoggerFactory;
* event bus.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
- * @param <MESSAGE> the generic type
+ * @param <M> the generic type
*/
-public class MessageBlockHandler<MESSAGE> {
+public class MessageBlockHandler<M> {
// Logger for this class
private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessageBlockHandler.class);
@@ -72,7 +72,7 @@ public class MessageBlockHandler<MESSAGE> {
*
* @param messageBlock the block containing typed messages
*/
- public void post(final MessageBlock<MESSAGE> messageBlock) {
+ public void post(final MessageBlock<M> messageBlock) {
if (messageBlock.getMessages() != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("new data message recieved from {}", messageBlock.getConnection() == null ? "server"
@@ -90,7 +90,7 @@ public class MessageBlockHandler<MESSAGE> {
public void post(final String messageString) {
if (messageString != null) {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("new string message recieved from server: " + messageString);
+ LOGGER.debug("new string message recieved from server: {}", messageString);
}
eventBus.post(messageString);
}
@@ -101,7 +101,7 @@ public class MessageBlockHandler<MESSAGE> {
*
* @param listener is an instance of WebSocketMessageListener
*/
- public void registerMessageHandler(final MessageListener<MESSAGE> listener) {
+ public void registerMessageHandler(final MessageListener<M> listener) {
LOGGER.entry(listener);
if (listener == null) {
throw new IllegalArgumentException("listener object cannot be null");
@@ -116,7 +116,7 @@ public class MessageBlockHandler<MESSAGE> {
*
* @param listener the listener
*/
- public void unRegisterMessageHandler(final MessageListener<MESSAGE> listener) {
+ public void unRegisterMessageHandler(final MessageListener<M> listener) {
if (listener == null) {
throw new IllegalArgumentException("listener object cannot be null");
}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/InternalMessageBusServer.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/InternalMessageBusServer.java
index 8e65bbf98..a436bd7e3 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/InternalMessageBusServer.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/InternalMessageBusServer.java
@@ -38,9 +38,9 @@ import org.slf4j.ext.XLoggerFactory;
* receive messages on the web socket.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
- * @param <MESSAGE> the generic type
+ * @param <M> the generic type
*/
-abstract class InternalMessageBusServer<MESSAGE> extends WebSocketServerImpl implements MessagingService<MESSAGE> {
+abstract class InternalMessageBusServer<M> extends WebSocketServerImpl implements MessagingService<M> {
// Logger for this class
private static final XLogger LOGGER = XLoggerFactory.getXLogger(InternalMessageBusServer.class);
@@ -50,15 +50,15 @@ abstract class InternalMessageBusServer<MESSAGE> extends WebSocketServerImpl imp
private static final String RAW_EVENT_BUS = "Raw-Event-Bus";
// This instance handles the raw data received from the web socket
- private final RawMessageHandler<MESSAGE> rawMessageHandler = new RawMessageHandler<>();
+ private final RawMessageHandler<M> rawMessageHandler = new RawMessageHandler<>();
// The message block handler to which to pass messages coming in on this client
- private MessageBlockHandler<MESSAGE> messageBlockHandler = null;
+ private MessageBlockHandler<M> messageBlockHandler = null;
// The raw message handler uses a thread to process incoming events off a queue, this class owns and controls that
// thread. These fields hold the thread and
// the thread factory for creating threads.
- private ApplicationThreadFactory tFactory =
+ private ApplicationThreadFactory threadFactory =
new ApplicationThreadFactory("ws-server-thread", THREAD_FACTORY_STACK_SIZE);
private Thread forwarderThread = null;
@@ -77,7 +77,7 @@ abstract class InternalMessageBusServer<MESSAGE> extends WebSocketServerImpl imp
messageBlockHandler.registerMessageHandler(rawMessageHandler);
// Create the thread that manages the queue in the data handler
- forwarderThread = tFactory.newThread(rawMessageHandler);
+ forwarderThread = threadFactory.newThread(rawMessageHandler);
forwarderThread.start();
LOGGER.exit();
@@ -95,13 +95,23 @@ abstract class InternalMessageBusServer<MESSAGE> extends WebSocketServerImpl imp
messageBlockHandler.post(new RawMessageBlock(rawMessage, webSocket));
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.java_websocket.server.WebSocketServer#onMessage(org.java_websocket.WebSocket, java.lang.String)
+ */
+ @Override
+ public void onMessage(final WebSocket webSocket, final String stringMessage) {
+ messageBlockHandler.post(stringMessage);
+ }
+
/**
* Register a subscriber class to the raw message handler.
*
* @param subscriber the subscriber
*/
@Override
- public void addMessageListener(final MessageListener<MESSAGE> subscriber) {
+ public void addMessageListener(final MessageListener<M> subscriber) {
rawMessageHandler.registerDataForwarder(subscriber);
}
@@ -111,20 +121,10 @@ abstract class InternalMessageBusServer<MESSAGE> extends WebSocketServerImpl imp
* @param subscriber the subscriber
*/
@Override
- public void removeMessageListener(final MessageListener<MESSAGE> subscriber) {
+ public void removeMessageListener(final MessageListener<M> subscriber) {
rawMessageHandler.unRegisterDataForwarder(subscriber);
}
- /*
- * (non-Javadoc)
- *
- * @see org.java_websocket.server.WebSocketServer#onMessage(org.java_websocket.WebSocket, java.lang.String)
- */
- @Override
- public void onMessage(final WebSocket webSocket, final String stringMessage) {
- messageBlockHandler.post(stringMessage);
- }
-
/**
* Stop the thread handling message forwarding.
*/
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java
index 389d04dcc..d5ef40b5b 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java
@@ -34,9 +34,9 @@ import org.slf4j.ext.XLoggerFactory;
* A messaging server implementation using web socket.
*
* @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
- * @param <MESSAGE> the generic type of message being passed
+ * @param <M> the generic type of message being passed
*/
-public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE> {
+public class MessageServerImpl<M> extends InternalMessageBusServer<M> {
// The logger for this class
private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessageServerImpl.class);
@@ -44,7 +44,7 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE
private static final String PROTOCOL = "ws://";
// URI of this server
- private final String connectionURI;
+ private final String connectionUri;
// Indicates if the web socket server is started or not
private boolean isStarted = false;
@@ -60,8 +60,8 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE
LOGGER.entry(address);
// Compose the Web Socket URI
- connectionURI = PROTOCOL + address.getHostString() + ":" + address.getPort();
- LOGGER.debug("Server connection URI: {}", connectionURI);
+ connectionUri = PROTOCOL + address.getHostString() + ":" + address.getPort();
+ LOGGER.debug("Server connection URI: {}", connectionUri);
LOGGER.exit();
}
@@ -106,27 +106,25 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE
}
/**
- * This method returns the current connection URI , if the server started otherwise it throws
- * {@link IllegalStateException}.
+ * Return the current connection URI.
*
* @return connection URI
*/
- public String getConnectionURI() {
- if (connectionURI == null) {
+ public String getConnectionUrl() {
+ if (connectionUri == null) {
throw new IllegalStateException("URI not set - The server is not started");
}
- return connectionURI;
+ return connectionUri;
}
/*
* (non-Javadoc)
*
- * @see
- * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex
- * .core. infrastructure. messaging.MessageHolder)
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex .core.
+ * infrastructure. messaging.MessageHolder)
*/
@Override
- public void send(final MessageHolder<MESSAGE> message) {
+ public void send(final MessageHolder<M> message) {
// Send the incoming message to all clients connected to this web socket
final Collection<WebSocket> connections = getConnections();
for (final WebSocket webSocket : connections) {
@@ -137,8 +135,7 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE
/*
* (non-Javadoc)
*
- * @see
- * org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String)
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String)
*/
@Override
public void send(final String messageString) {
@@ -160,6 +157,6 @@ public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE
@Override
public void onStart() {
- LOGGER.debug("started deployment server on URI: {}", connectionURI);
+ LOGGER.debug("started deployment server on URI: {}", connectionUri);
}
}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageClient.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WsStringMessageClient.java
index 00ade8047..28afde03b 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageClient.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WsStringMessageClient.java
@@ -37,15 +37,18 @@ import org.slf4j.ext.XLoggerFactory;
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
-public class WSStringMessageClient implements WSStringMessager {
- private static final XLogger LOGGER = XLoggerFactory.getXLogger(WSStringMessageClient.class);
+public class WsStringMessageClient implements WsStringMessager {
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(WsStringMessageClient.class);
+
+ // Repeated string constants
+ private static final String MESSAGE_PREAMBLE = "web socket event consumer client to \"";
// Message service factory and the message service itself
private final MessagingServiceFactory<String> factory = new MessagingServiceFactory<>();
private MessagingService<String> service = null;
// The listener to use for reception of strings
- private WSStringMessageListener wsStringMessageListener;
+ private WsStringMessageListener wsStringMessageListener;
// Address of the server
private final String host;
@@ -58,7 +61,7 @@ public class WSStringMessageClient implements WSStringMessager {
* @param host the host of the server
* @param port the port of the server
*/
- public WSStringMessageClient(final String host, final int port) {
+ public WsStringMessageClient(final String host, final int port) {
this.host = host;
this.port = port;
}
@@ -71,22 +74,24 @@ public class WSStringMessageClient implements WSStringMessager {
* apex. core.infrastructure.messaging. stringmessaging.WSStringMessageListener)
*/
@Override
- public void start(final WSStringMessageListener newWsStringMessageListener) throws MessagingException {
+ public void start(final WsStringMessageListener newWsStringMessageListener) throws MessagingException {
this.wsStringMessageListener = newWsStringMessageListener;
uriString = "ws://" + host + ":" + port;
- LOGGER.entry("web socket event consumer client to \"" + uriString + "\" starting . . .");
+ String messagePreamble = MESSAGE_PREAMBLE + uriString + "\" ";
+ LOGGER.entry(messagePreamble + "starting . . .");
try {
service = factory.createClient(new URI(uriString));
- service.addMessageListener(new WSStringMessageClientListener());
+ service.addMessageListener(new WsStringMessageClientListener());
service.startConnection();
} catch (final Exception e) {
- LOGGER.warn("web socket event consumer client to \"" + uriString + "\" start failed", e);
- throw new MessagingException("web socket event consumer client to \"" + uriString + "\" start failed", e);
+ String message = messagePreamble + "start failed";
+ LOGGER.warn(message, e);
+ throw new MessagingException(message, e);
}
- LOGGER.exit("web socket event consumer client to \"" + uriString + "\" started");
+ LOGGER.exit(messagePreamble + "started");
}
/*
@@ -96,9 +101,9 @@ public class WSStringMessageClient implements WSStringMessager {
*/
@Override
public void stop() {
- LOGGER.entry("web socket event consumer client to \"" + uriString + "\" stopping . . .");
+ LOGGER.entry(MESSAGE_PREAMBLE + uriString + "\" stopping . . .");
service.stopConnection();
- LOGGER.exit("web socket event consumer client to \"" + uriString + "\" stopped");
+ LOGGER.exit(MESSAGE_PREAMBLE + uriString + "\" stopped");
}
/*
@@ -113,14 +118,15 @@ public class WSStringMessageClient implements WSStringMessager {
service.send(stringMessage);
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("message sent to server: " + stringMessage);
+ String message = "message sent to server: " + stringMessage;
+ LOGGER.debug(message);
}
}
/**
* The Class WSStringMessageClientListener.
*/
- private class WSStringMessageClientListener implements MessageListener<String> {
+ private class WsStringMessageClientListener implements MessageListener<String> {
/*
* (non-Javadoc)
*
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageListener.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WsStringMessageListener.java
index e524b43d7..0a5e147cc 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageListener.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WsStringMessageListener.java
@@ -25,7 +25,7 @@ package org.onap.policy.apex.core.infrastructure.messaging.stringmessaging;
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
-public interface WSStringMessageListener {
+public interface WsStringMessageListener {
/**
* Receive a string coming off a web socket.
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageServer.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WsStringMessageServer.java
index 4da478f6a..3e8db268c 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageServer.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WsStringMessageServer.java
@@ -39,15 +39,15 @@ import org.slf4j.ext.XLoggerFactory;
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
-public class WSStringMessageServer implements WSStringMessager {
- private static final XLogger LOGGER = XLoggerFactory.getXLogger(WSStringMessageServer.class);
+public class WsStringMessageServer implements WsStringMessager {
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(WsStringMessageServer.class);
// Message service factory and the message service itself
private final MessagingServiceFactory<String> factory = new MessagingServiceFactory<>();
private MessagingService<String> service = null;
// The listener to use for reception of strings
- private WSStringMessageListener wsStringMessageListener;
+ private WsStringMessageListener wsStringMessageListener;
// Address of the server
private final int port;
@@ -57,7 +57,7 @@ public class WSStringMessageServer implements WSStringMessager {
*
* @param port the port of the server
*/
- public WSStringMessageServer(final int port) {
+ public WsStringMessageServer(final int port) {
this.port = port;
}
@@ -69,19 +69,19 @@ public class WSStringMessageServer implements WSStringMessager {
* apex. core.infrastructure.messaging. stringmessaging.WSStringMessageListener)
*/
@Override
- public void start(final WSStringMessageListener newWsStringMessageListener) throws MessagingException {
+ public void start(final WsStringMessageListener newWsStringMessageListener) throws MessagingException {
this.wsStringMessageListener = newWsStringMessageListener;
LOGGER.entry("web socket event consumer server starting . . .");
try {
- final InetAddress addrLan = MessagingUtils.getLocalHostLANAddress();
+ final InetAddress addrLan = MessagingUtils.getLocalHostLanAddress();
LOGGER.debug("web socket string message server LAN address=" + addrLan.getHostAddress());
final InetAddress addr = InetAddress.getLocalHost();
LOGGER.debug("web socket string message server host address=" + addr.getHostAddress());
service = factory.createServer(new InetSocketAddress(port));
- service.addMessageListener(new WSStringMessageServerListener());
+ service.addMessageListener(new WsStringMessageServerListener());
service.startConnection();
} catch (final Exception e) {
@@ -115,14 +115,14 @@ public class WSStringMessageServer implements WSStringMessager {
public void sendString(final String stringMessage) {
service.send(stringMessage);
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("server sent message: " + stringMessage);
+ LOGGER.debug("server sent message: {}", stringMessage);
}
}
/**
* The listener for strings coming into the server.
*/
- private class WSStringMessageServerListener implements MessageListener<String> {
+ private class WsStringMessageServerListener implements MessageListener<String> {
/*
* (non-Javadoc)
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessager.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WsStringMessager.java
index a2781e932..2a731b0eb 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessager.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WsStringMessager.java
@@ -27,7 +27,7 @@ import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
-public interface WSStringMessager {
+public interface WsStringMessager {
/**
* Start the string message sender.
@@ -35,7 +35,7 @@ public interface WSStringMessager {
* @param wsStringMessageListener the listener to use for listening for string messages
* @throws MessagingException the messaging exception
*/
- void start(WSStringMessageListener wsStringMessageListener) throws MessagingException;
+ void start(WsStringMessageListener wsStringMessageListener) throws MessagingException;
/**
* Stop the string messaging sender.
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java
index 66edd2f1d..a501a66d6 100644
--- a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java
@@ -43,7 +43,7 @@ public final class MessagingUtils {
private static final int LOWEST_USER_PORT = 1024;
/**
- * Port number is an unsigned 16-bit integer, so maximum port is 65535
+ * Port number is an unsigned 16-bit integer, so maximum port is 65535.
*/
private static final int MAX_PORT_RANGE = 65535;
@@ -71,7 +71,7 @@ public final class MessagingUtils {
return port;
}
LOGGER.debug("Port {} is not available", port);
- throw new RuntimeException("could not allocate requested port: " + port);
+ throw new IllegalArgumentException("could not allocate requested port: " + port);
}
/**
@@ -96,7 +96,7 @@ public final class MessagingUtils {
LOGGER.debug("Port {} is not available", availablePort);
availablePort++;
}
- throw new RuntimeException("could not find free available");
+ throw new IllegalArgumentException("could not find free available");
}
/**
@@ -149,7 +149,7 @@ public final class MessagingUtils {
* @return an Internet address
* @throws UnknownHostException if the address of the local host cannot be found
*/
- public static InetAddress getLocalHostLANAddress() throws UnknownHostException {
+ public static InetAddress getLocalHostLanAddress() throws UnknownHostException {
try {
InetAddress candidateAddress = null;
// Iterate all NICs (network interface cards)...
@@ -225,8 +225,7 @@ public final class MessagingUtils {
} finally {
flushAndClose(oos, bytesOut);
}
- final byte[] bytes = bytesOut.toByteArray();
- return bytes;
+ return bytesOut.toByteArray();
}
/**
diff --git a/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/EndToEndStringMessagingTest.java b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/EndToEndStringMessagingTest.java
index 21c5ee984..c9d56ef2c 100644
--- a/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/EndToEndStringMessagingTest.java
+++ b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/EndToEndStringMessagingTest.java
@@ -24,9 +24,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.junit.Test;
-import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageClient;
-import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageListener;
-import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageServer;
+import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessageClient;
+import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessageListener;
+import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessageServer;
import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
@@ -40,22 +40,22 @@ public class EndToEndStringMessagingTest {
// Logger for this class
private static final XLogger logger = XLoggerFactory.getXLogger(EndToEndStringMessagingTest.class);
- private WSStringMessageServer server;
- private WSStringMessageClient client;
+ private WsStringMessageServer server;
+ private WsStringMessageClient client;
private boolean finished = false;
@Test
public void testEndToEndMessaging() throws MessagingException {
logger.debug("end to end messaging test starting . . .");
- server = new WSStringMessageServer(44441);
+ server = new WsStringMessageServer(44441);
assertNotNull(server);
- server.start(new WSStringServerMessageListener());
+ server.start(new WsStringServerMessageListener());
try {
- client = new WSStringMessageClient("localhost", 44441);
+ client = new WsStringMessageClient("localhost", 44441);
assertNotNull(client);
- client.start(new WSStringClientMessageListener());
+ client.start(new WsStringClientMessageListener());
client.sendString("Hello, client here");
@@ -74,7 +74,7 @@ public class EndToEndStringMessagingTest {
logger.debug("end to end messaging test finished");
}
- private class WSStringServerMessageListener implements WSStringMessageListener {
+ private class WsStringServerMessageListener implements WsStringMessageListener {
@Override
public void receiveString(final String stringMessage) {
logger.debug(stringMessage);
@@ -83,7 +83,7 @@ public class EndToEndStringMessagingTest {
}
}
- private class WSStringClientMessageListener implements WSStringMessageListener {
+ private class WsStringClientMessageListener implements WsStringMessageListener {
@Override
public void receiveString(final String stringMessage) {
logger.debug(stringMessage);
diff --git a/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/StringTestServer.java b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/StringTestServer.java
index 09fa62d59..30590c0c0 100644
--- a/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/StringTestServer.java
+++ b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/StringTestServer.java
@@ -22,18 +22,29 @@ package org.onap.policy.apex.core.infrastructure.messaging;
import static org.junit.Assert.assertNotNull;
-import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageListener;
-import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageServer;
+import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessageListener;
+import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessageServer;
import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
+// TODO: Auto-generated Javadoc
+/**
+ * The Class StringTestServer.
+ */
public class StringTestServer {
- private WSStringMessageServer server;
+ private WsStringMessageServer server;
+ /**
+ * Create a string test server.
+ *
+ * @param port port to use
+ * @param timeToLive time to live
+ * @throws MessagingException exceptions on messages
+ */
public StringTestServer(final int port, long timeToLive) throws MessagingException {
System.out.println("StringTestServer starting on port " + port + " for " + timeToLive + " seconds . . .");
- server = new WSStringMessageServer(port);
+ server = new WsStringMessageServer(port);
assertNotNull(server);
- server.start(new WSStringServerMessageListener());
+ server.start(new WsStringServerMessageListener());
System.out.println("StringTestServer started on port " + port + " for " + timeToLive + " seconds");
@@ -45,7 +56,23 @@ public class StringTestServer {
System.out.println("StringTestServer completed");
}
- private class WSStringServerMessageListener implements WSStringMessageListener {
+ /**
+ * The listener interface for receiving WSStringServerMessage events. The class that is interested in processing a
+ * WSStringServerMessage event implements this interface, and the object created with that class is registered with
+ * a component using the component's <code>addWSStringServerMessageListener</code> method. When the
+ * WSStringServerMessage event occurs, that object's appropriate method is invoked.
+ *
+ * @see WSStringServerMessageEvent
+ */
+ private class WsStringServerMessageListener implements WsStringMessageListener {
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WsStringMessageListener#receiveString(java
+ * .lang.String)
+ */
@Override
public void receiveString(final String stringMessage) {
System.out.println("Server received string \"" + stringMessage + "\"");
@@ -53,6 +80,12 @@ public class StringTestServer {
}
}
+ /**
+ * The main method.
+ *
+ * @param args the arguments
+ * @throws MessagingException the messaging exception
+ */
public static void main(final String[] args) throws MessagingException {
if (args.length != 2) {
System.err.println("Usage: StringTestServer port timeToLive");