summaryrefslogtreecommitdiffstats
path: root/core/core-infrastructure/src
diff options
context:
space:
mode:
authorramverma <ram.krishna.verma@ericsson.com>2018-06-01 11:51:36 +0100
committerramverma <ram.krishna.verma@ericsson.com>2018-06-04 10:50:44 +0100
commit37d6fd9069eb30d88c4ad80b5f35099ed173cc13 (patch)
tree0f30a71577644047feee43bd8857dc5a82a51c87 /core/core-infrastructure/src
parent5722440b2eb8ff1923dda9d4d856f0adc1ac8e6f (diff)
Adding apex core module to apex-pdp
Change-Id: I4bfe1df3e44fe62ff6789e813e59836e267ab3b2 Issue-ID: POLICY-858 Signed-off-by: ramverma <ram.krishna.verma@ericsson.com>
Diffstat (limited to 'core/core-infrastructure/src')
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/JavaHandlingException.java58
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/ClassUtils.java249
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/package-info.java27
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/package-info.java27
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageHolder.java168
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageListener.java47
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingException.java49
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingService.java76
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingServiceFactory.java59
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java253
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/WebSocketMessageListener.java58
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/InternalMessageBusClient.java131
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java162
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/WebSocketClientImpl.java83
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/package-info.java27
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlock.java70
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlockHandler.java128
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/RawMessageBlock.java67
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/package-info.java27
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/package-info.java27
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/InternalMessageBusServer.java134
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java161
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/WebSocketServerImpl.java89
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/package-info.java27
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/package-info.java27
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageClient.java147
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageListener.java36
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageServer.java150
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessager.java51
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/package-info.java27
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java260
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/package-info.java27
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/package-info.java27
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ApplicationThreadFactory.java142
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java50
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/package-info.java27
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/xml/XPathReader.java115
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/xml/package-info.java27
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassBuilder.java133
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassByteCodeFileObject.java89
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassCompilationUnit.java83
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassLoader.java61
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleFileManager.java79
-rw-r--r--core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/package-info.java28
-rw-r--r--core/core-infrastructure/src/main/resources/logback.xml50
-rw-r--r--core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/EndToEndStringMessagingTest.java87
-rw-r--r--core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/StringTestServer.java83
-rw-r--r--core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/TestMessageListener.java65
-rw-r--r--core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/threading/ThreadingTest.java92
-rw-r--r--core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/threading/ThreadingTestThread.java110
-rw-r--r--core/core-infrastructure/src/test/resources/logback-test.xml70
51 files changed, 4347 insertions, 0 deletions
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/JavaHandlingException.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/JavaHandlingException.java
new file mode 100644
index 000000000..f6ef68105
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/JavaHandlingException.java
@@ -0,0 +1,58 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.java;
+
+/**
+ * This class will be called if an error occurs in Java handling.
+ *
+ * @author Liam Fallon
+ */
+public class JavaHandlingException extends Exception {
+ private static final long serialVersionUID = -6375859029774312663L;
+
+ /**
+ * Instantiates a new Java handling exception.
+ *
+ * @param message the message
+ */
+ public JavaHandlingException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Instantiates a new Java handling exception.
+ *
+ * @param e the exception to wrap
+ */
+ public JavaHandlingException(final Exception e) {
+ super(e);
+ }
+
+ /**
+ * Instantiates a new Java handling exception.
+ *
+ * @param message the message
+ * @param e the exception to wrap
+ */
+ public JavaHandlingException(final String message, final Exception e) {
+ super(message, e);
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/ClassUtils.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/ClassUtils.java
new file mode 100644
index 000000000..919d1b122
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/ClassUtils.java
@@ -0,0 +1,249 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.java.classes;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * This class is a utility class used to find Java classes on the class path, in directories, and in Jar files.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public abstract class ClassUtils {
+ // Get a reference to the logger
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(ClassUtils.class);
+
+ // The boot directory in Java for predefined JARs
+ private static final String SUN_BOOT_LIBRARY_PATH = "sun.boot.library.path";
+
+ // Token for Classes directory in paths
+ private static final String CLASSES_TOKEN = "/classes/";
+
+ // Token for library fragment in path
+ private static final String LIBRARAY_PATH_TOKEN = "/lib";
+
+ /**
+ * Private constructor used to prevent sub class instantiation.
+ */
+ private ClassUtils() {}
+
+ /**
+ * Get the class names of all classes on the class path. WARNING: This is a heavy call, use sparingly
+ *
+ * @return a set of class names for all classes in the class path
+ */
+ public static Set<String> getClassNames() {
+ // The return set of class names
+ final Set<String> classNameSet = new TreeSet<>();
+
+ try {
+ // The library path for predefined classes in Java
+ String sunBootLibraryPathString = System.getProperty(SUN_BOOT_LIBRARY_PATH);
+
+ // Check it exists and has a "lib" in it
+ if (sunBootLibraryPathString != null && sunBootLibraryPathString.contains(LIBRARAY_PATH_TOKEN)) {
+ // Strip any superfluous trailer from path
+ sunBootLibraryPathString = sunBootLibraryPathString.substring(0,
+ sunBootLibraryPathString.lastIndexOf(LIBRARAY_PATH_TOKEN) + LIBRARAY_PATH_TOKEN.length());
+
+ final File bootLibraryFile = new File(sunBootLibraryPathString);
+ // The set used to hold class names is populated with predefined Java classes
+ classNameSet.addAll(processDir(bootLibraryFile, ""));
+ }
+
+ // Get the entries on the class path
+ URL[] urls = ((URLClassLoader) ClassLoader.getSystemClassLoader()).getURLs();
+
+ // Try get the classes in the bootstrap loader
+ try {
+ final Class<?> nullclassloader = Class.forName("sun.misc.Launcher");
+ if (nullclassloader != null) {
+ // There a long way and a short way, Short way: causes a warning that cannot be suppressed
+ // URL[] moreurls = sun.misc.Launcher.getBootstrapClassPath().getURLs();
+ // long way:
+ Method m = nullclassloader.getMethod("getBootstrapClassPath");
+ if (m != null) {
+ final Object cp = m.invoke(null, (Object[]) null);
+ if (cp != null) {
+ m = cp.getClass().getMethod("getURLs");
+ if (m != null) {
+ final URL[] moreurls = (URL[]) (m.invoke(cp, (Object[]) null));
+ if (moreurls != null && moreurls.length > 0) {
+ if (urls.length == 0) {
+ urls = moreurls;
+ } else {
+ final URL[] result = Arrays.copyOf(urls, urls.length + moreurls.length);
+ System.arraycopy(moreurls, 0, result, urls.length, moreurls.length);
+ urls = result;
+ }
+ }
+ }
+ }
+ }
+ // end long way!
+ }
+ } catch (final ClassNotFoundException e) {
+ LOGGER.warn("Failed to find default path for JRE libraries", e);
+ }
+
+ // Iterate over the class path entries
+ for (final URL url : urls) {
+ if (url == null || url.getFile() == null) {
+ continue;
+ }
+ final File urlFile = new File(url.getFile());
+ // Directories may contain ".class" files
+ if (urlFile.isDirectory()) {
+ classNameSet.addAll(processDir(urlFile, url.getFile()));
+ }
+ // JARs are processed as well
+ else if (url.getFile().endsWith(".jar")) {
+ classNameSet.addAll(processJar(urlFile));
+ } else {
+ // It's a resource or some other non-executable thing
+ continue;
+ }
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("could not get the names of Java classes", e);
+ }
+
+ return classNameSet;
+ }
+
+ /**
+ * Find all classes in directories and JARs in those directories.
+ *
+ * @param classDirectory The directory to search for classes
+ * @param rootDir The root directory, to be removed from absolute paths
+ * @return a set of classes which may be empty
+ * @throws Exception on errors processing directories
+ */
+ public static Set<String> processDir(final File classDirectory, final String rootDir) throws Exception {
+ // The return set
+ final TreeSet<String> classNameSet = new TreeSet<>();
+
+ // Iterate over the directory
+ if (classDirectory == null || !classDirectory.isDirectory()) {
+ return classNameSet;
+ }
+ for (final File child : classDirectory.listFiles()) {
+ if (child.isDirectory()) {
+ // Recurse down
+ classNameSet.addAll(processDir(child, rootDir));
+ } else if (child.getName().endsWith(".jar")) {
+ // Process the JAR
+ classNameSet.addAll(processJar(child));
+ } else if (child.getName().endsWith(".class") && !child.getName().contains("$")) {
+ // Process the ".class" file
+ classNameSet.add(
+ child.getAbsolutePath().replace(rootDir, "").replaceFirst("\\.class$", "").replace('/', '.'));
+ } else {
+ continue;
+ }
+ }
+ return classNameSet;
+ }
+
+ /**
+ * Condition the file name as a class name.
+ *
+ * @param fileNameIn The file name to convert to a class name
+ * @return the conditioned class name
+ */
+ public static String processFileName(final String fileNameIn) {
+ String fileName = fileNameIn;
+
+ if (fileName == null) {
+ return null;
+ }
+ final int classesPos = fileName.indexOf(CLASSES_TOKEN);
+
+ if (classesPos != -1) {
+ fileName = fileName.substring(classesPos + CLASSES_TOKEN.length());
+ }
+
+ return fileName.replaceFirst("\\.class$", "").replace('/', '.');
+ }
+
+ /**
+ * Read all the class names from a Jar.
+ *
+ * @param jarFile the JAR file
+ * @return a set of class names
+ * @throws Exception on errors processing JARs
+ */
+ public static Set<String> processJar(final File jarFile) throws Exception {
+ // Pass the file as an input stream
+ return processJar(new FileInputStream(jarFile.getAbsolutePath()));
+ }
+
+ /**
+ * Read all the class names from a Jar.
+ *
+ * @param jarInputStream the JAR input stream
+ * @return a set of class names
+ * @throws Exception on errors processing JARs
+ */
+ public static Set<String> processJar(final InputStream jarInputStream) throws Exception {
+ // The return set
+ final TreeSet<String> classPathSet = new TreeSet<>();
+
+ if (jarInputStream == null) {
+ return classPathSet;
+ }
+ // JARs are ZIP files
+ final ZipInputStream zip = new ZipInputStream(jarInputStream);
+
+ // Iterate over each entry in the JAR
+ for (ZipEntry entry = zip.getNextEntry(); entry != null; entry = zip.getNextEntry()) {
+ if (!entry.isDirectory() && entry.getName().endsWith(".class") && !entry.getName().contains("$")) {
+ classPathSet.add(entry.getName().replaceFirst("\\.class$", "").replace('/', '.'));
+ }
+ }
+ zip.close();
+ return classPathSet;
+ }
+
+ /**
+ * The main method exercises this class for test purposes.
+ *
+ * @param args the args
+ */
+ public static void main(final String[] args) {
+ for (final String clz : getClassNames()) {
+ System.out.println("Found class: " + clz);
+ }
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/package-info.java
new file mode 100644
index 000000000..c356580b3
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Contains support to find Java classes on the class path, in directories and in Jar files.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+
+package org.onap.policy.apex.core.infrastructure.java.classes;
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/package-info.java
new file mode 100644
index 000000000..5a8b51132
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Allows Java classes to be created by compiling Java source code and generating classes on the fly.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+
+package org.onap.policy.apex.core.infrastructure.java;
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageHolder.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageHolder.java
new file mode 100644
index 000000000..f74ffa0b3
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageHolder.java
@@ -0,0 +1,168 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The Class MessageHolder holds a set of messages to be sent as a single block of messages in this messaging
+ * implementation.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ * @param <MESSAGE> the generic type of message being handled by a message holder instance
+ */
+public class MessageHolder<MESSAGE> implements Serializable {
+ private static final int HASH_PRIME = 31;
+ private static final int FOUR_BYTES = 32;
+
+ // Serial ID
+ private static final long serialVersionUID = 1235487535388793719L;
+
+ // Get a reference to the logger
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessageHolder.class);
+
+ // Properties of the message holder
+ private final long creationTime;
+ private final InetAddress senderHostAddress;
+
+ // Sequence of message in the message holder
+ private final List<MESSAGE> messages;
+
+ /**
+ * Constructor, create the message holder.
+ *
+ * @param senderHostAddress the host address of the sender of the message holder container
+ */
+ public MessageHolder(final InetAddress senderHostAddress) {
+ LOGGER.entry(senderHostAddress);
+ messages = new ArrayList<>();
+ this.senderHostAddress = senderHostAddress;
+ creationTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Return the messages in this message holder.
+ *
+ * @return the messages
+ */
+ public List<MESSAGE> getMessages() {
+ return messages;
+ }
+
+ /**
+ * Adds a message to this message holder.
+ *
+ * @param message the message to add
+ */
+ public void addMessage(final MESSAGE message) {
+ if (!messages.contains(message)) {
+ messages.add(message);
+ } else {
+ LOGGER.warn("duplicate message {} added to message holder", message);
+ }
+ }
+
+ /**
+ * Gets the creation time.
+ *
+ * @return the creation time
+ */
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ /**
+ * Gets the sender host address.
+ *
+ * @return the sender host address
+ */
+ public InetAddress getSenderHostAddress() {
+ return senderHostAddress;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return "ApexCommandProtocol [creationTime=" + creationTime + ", senderHostAddress=" + senderHostAddress + "]";
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ final int prime = HASH_PRIME;
+ int result = 1;
+ result = prime * result + ((senderHostAddress == null) ? 0 : senderHostAddress.hashCode());
+ result = prime * result + ((messages == null) ? 0 : messages.hashCode());
+ result = prime * result + (int) (creationTime ^ (creationTime >>> FOUR_BYTES));
+ return result;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final MessageHolder<?> other = (MessageHolder<?>) obj;
+ if (senderHostAddress == null) {
+ if (other.senderHostAddress != null) {
+ return false;
+ }
+ } else if (!senderHostAddress.equals(other.senderHostAddress)) {
+ return false;
+ }
+ if (messages == null) {
+ if (other.messages != null) {
+ return false;
+ }
+ } else if (!messages.equals(other.messages)) {
+ return false;
+ }
+ if (creationTime != other.creationTime) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageListener.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageListener.java
new file mode 100644
index 000000000..c8b132423
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageListener.java
@@ -0,0 +1,47 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging;
+
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
+
+/**
+ * The listener interface for receiving message events. The class that is interested in processing a message event
+ * implements this interface.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ * @param <MESSAGE> of message of any given type that is being listened for and handled
+ */
+public interface MessageListener<MESSAGE> {
+
+ /**
+ * This method is called when a message block is received on a web socket and is to be forwarded to a listener.
+ *
+ * @param data the message data containing a message
+ */
+ void onMessage(MessageBlock<MESSAGE> data);
+
+ /**
+ * This method is called when a string message is received on a web socket and is to be forwarded to a listener.
+ *
+ * @param messageString the message string
+ */
+ void onMessage(String messageString);
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingException.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingException.java
new file mode 100644
index 000000000..ef435b2a5
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingException.java
@@ -0,0 +1,49 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging;
+
+/**
+ * This class will be called if an error occurs in Java handling.
+ *
+ * @author Liam Fallon
+ */
+public class MessagingException extends Exception {
+ private static final long serialVersionUID = -6375859029774312663L;
+
+ /**
+ * Instantiates a new messaging exception.
+ *
+ * @param message the message
+ */
+ public MessagingException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Instantiates a new messaging exception.
+ *
+ * @param message the message
+ * @param e the e
+ */
+ public MessagingException(final String message, final Exception e) {
+ super(message, e);
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingService.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingService.java
new file mode 100644
index 000000000..7e91b95ea
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingService.java
@@ -0,0 +1,76 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging;
+
+/**
+ * The Interface MessagingService specifies the methods that must be implemented by any implementation providing Apex
+ * messaging.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ * @param <MESSAGE> the type of message being passed by an implementation of Apex messaging
+ */
+public interface MessagingService<MESSAGE> {
+
+ /**
+ * Start the messaging connection.
+ */
+ void startConnection();
+
+ /**
+ * Stop the messaging connection.
+ */
+ void stopConnection();
+
+ /**
+ * Checks if the messaging connection is started.
+ *
+ * @return true, if is started
+ */
+ boolean isStarted();
+
+ /**
+ * Send a block of messages on the connection, the messages are contained in the the message holder container.
+ *
+ * @param messageHolder The message holder holding the messages to be sent
+ */
+ void send(MessageHolder<MESSAGE> messageHolder);
+
+ /**
+ * Send a string message on the connection.
+ *
+ * @param messageString The message string to be sent
+ */
+ void send(String messageString);
+
+ /**
+ * Adds a message listener that will be called when a message is received by this messaging service implementation.
+ *
+ * @param messageListener the message listener
+ */
+ void addMessageListener(MessageListener<MESSAGE> messageListener);
+
+ /**
+ * Removes the message listener.
+ *
+ * @param messageListener the message listener
+ */
+ void removeMessageListener(MessageListener<MESSAGE> messageListener);
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingServiceFactory.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingServiceFactory.java
new file mode 100644
index 000000000..1d08fac74
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingServiceFactory.java
@@ -0,0 +1,59 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.client.MessagingClient;
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.server.MessageServerImpl;
+
+/**
+ * A factory class to create a "server" or "client" type Messaging Service.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ * @param <MESSAGE> the generic type of message to be handled by this messaging service
+ */
+public class MessagingServiceFactory<MESSAGE> {
+
+ /**
+ * Create a web socket server instance and returns to the caller.
+ *
+ * @param address the address of the server machine
+ * @return the messaging service
+ */
+ public MessagingService<MESSAGE> createServer(final InetSocketAddress address) {
+ return new MessageServerImpl<>(address);
+ }
+
+ /**
+ * Create a web socket client instance and returns to the caller.
+ *
+ * @param uri the URI of the server to connect to
+ * @return an instance of {@link MessagingService}
+ */
+ public MessagingService<MESSAGE> createClient(final URI uri) {
+ if (uri == null) {
+ throw new IllegalArgumentException("URI cannot be null");
+ }
+ return new MessagingClient<>(uri);
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java
new file mode 100644
index 000000000..534bee8af
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java
@@ -0,0 +1,253 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.impl.ws;
+
+import com.google.common.eventbus.Subscribe;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
+import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlockHandler;
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.RawMessageBlock;
+import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The Class RawMessageHandler handles raw messages being received on a Java web socket and forwards the messages to the
+ * DataHandler instance that has subscribed to the RawMessageHandler instance.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ * @param <MESSAGE> the generic type of message being received
+ */
+public class RawMessageHandler<MESSAGE> implements WebSocketMessageListener<MESSAGE>, Runnable {
+ // The logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(RawMessageHandler.class);
+
+ // The amount of time to sleep during shutdown for the thread of this message handler to stop
+ private static final int SHUTDOWN_WAIT_TIME = 10;
+
+ // The timeout to wait between queue poll timeouts in milliseconds
+ private static final long QUEUE_POLL_TIMEOUT = 50;
+
+ // A queue that temporarily holds message blocks
+ private final BlockingQueue<MessageBlock<MESSAGE>> messageBlockQueue = new LinkedBlockingDeque<>();
+
+ // A queue that temporarily holds message blocks
+ private final BlockingQueue<String> stringMessageQueue = new LinkedBlockingDeque<>();
+
+ // Client applications that have subscribed for messages
+ private final MessageBlockHandler<MESSAGE> dataHandler = new MessageBlockHandler<MESSAGE>("data-processor");
+
+ // The thread that the raw message handler is receiving messages on
+ private Thread thisThread = null;
+
+ /**
+ * This method is called by the class with which this message listener has been registered.
+ *
+ * @param incomingData the data forwarded by the message reception class
+ */
+ @Override
+ @Subscribe
+ public void onMessage(final RawMessageBlock incomingData) {
+ // Sanity check and get incoming data
+ ByteBuffer dataByteBuffer = null;
+ if (incomingData != null && incomingData.getMessage() != null) {
+ dataByteBuffer = incomingData.getMessage();
+ } else {
+ return;
+ }
+
+ // Read the messages from the web socket and place them on the message queue for handling by the queue
+ // processing thread
+ ObjectInputStream ois = null;
+ try {
+ ois = new ObjectInputStream(new ByteArrayInputStream(dataByteBuffer.array()));
+ @SuppressWarnings("unchecked")
+ final MessageHolder<MESSAGE> messageHolder = (MessageHolder<MESSAGE>) ois.readObject();
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("message {} recieved from the client {} ", messageHolder.toString(),
+ messageHolder == null ? "Apex Engine " : messageHolder.getSenderHostAddress());
+ }
+
+ final List<MESSAGE> messages = messageHolder.getMessages();
+ if (messages != null) {
+ messageBlockQueue.add(new MessageBlock<MESSAGE>(messages, incomingData.getConn()));
+ }
+ } catch (IOException | ClassNotFoundException e) {
+ LOGGER.error("Failed to process message received");
+ LOGGER.catching(e);
+ } finally {
+ closeObjectStream(ois);
+ }
+ }
+
+ /**
+ * This method is called when a string message is received on a web socket and is to be forwarded to a listener.
+ *
+ * @param messageString the message string
+ */
+ @Override
+ @Subscribe
+ public void onMessage(final String messageString) {
+ if (messageString == null) {
+ return;
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("message {} recieved from the client {} ", messageString);
+ }
+ stringMessageQueue.add(messageString);
+ }
+
+ /**
+ * Close the {@link ObjectInputStream} stream.
+ *
+ * @param ois is an instance of {@link ObjectInputStream}
+ */
+ private void closeObjectStream(final ObjectInputStream ois) {
+ if (ois != null) {
+ try {
+ ois.close();
+ } catch (final IOException e) {
+ LOGGER.catching(e);
+ }
+ }
+ }
+
+ /**
+ * This thread monitors the message queue and processes messages as they appear on the queue.
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ LOGGER.debug("raw message listening started");
+ thisThread = Thread.currentThread();
+
+ // Run until termination
+ while (thisThread.isAlive() && !thisThread.isInterrupted()) {
+ try {
+ // Read message block messages from the queue and pass it to the data handler
+ MessageBlock<MESSAGE> messageBlock = null;
+ while ((messageBlock = messageBlockQueue.poll(1, TimeUnit.MILLISECONDS)) != null) {
+ dataHandler.post(messageBlock);
+ }
+ } catch (final InterruptedException e) {
+ LOGGER.debug("raw message listening has been interrupted");
+ break;
+ }
+
+ try {
+ // Read string messages from the queue and pass it to the data handler
+ String stringMessage = null;
+ while ((stringMessage = stringMessageQueue.poll(1, TimeUnit.MILLISECONDS)) != null) {
+ dataHandler.post(stringMessage);
+ }
+ } catch (final InterruptedException e) {
+ LOGGER.debug("raw message listening has been interrupted");
+ break;
+ }
+
+ // Wait for new messages
+ try {
+ Thread.sleep(QUEUE_POLL_TIMEOUT);
+ } catch (final InterruptedException e) {
+ LOGGER.debug("raw message listening has been interrupted");
+ break;
+ }
+ }
+
+ LOGGER.debug("raw message listening stopped");
+ }
+
+ /**
+ * Shutdown the message handler.
+ */
+ public void shutdown() {
+ LOGGER.entry("shutting down raw message listening . . .");
+
+ // Interrupt the message handling thread
+ thisThread.interrupt();
+
+ // Wait for thread shutdown
+ while (thisThread.isAlive()) {
+ ThreadUtilities.sleep(SHUTDOWN_WAIT_TIME);
+ }
+
+ LOGGER.exit("shut down raw message listening");
+ }
+
+ /**
+ * This method is called when a message is received on a web socket and is to be forwarded to a listener.
+ *
+ * @param data the message data containing a message
+ */
+ @Override
+ public void onMessage(final MessageBlock<MESSAGE> data) {
+ throw new UnsupportedOperationException("this operation is not supported");
+ }
+
+ /**
+ * Register a data forwarder to which messages coming in on the web socket will be forwarded.
+ *
+ * @param listener The listener to register
+ */
+ @Override
+ public void registerDataForwarder(final MessageListener<MESSAGE> listener) {
+ stateCheck(listener);
+ dataHandler.registerMessageHandler(listener);
+ }
+
+ /**
+ * Unregister a data forwarder that was previously registered on the web socket listener.
+ *
+ * @param listener The listener to unregister
+ */
+ @Override
+ public void unRegisterDataForwarder(final MessageListener<MESSAGE> listener) {
+ stateCheck(listener);
+ dataHandler.unRegisterMessageHandler(listener);
+ }
+
+ /**
+ * Sanity check for the listener and data handler.
+ *
+ * @param listener the listener to check
+ */
+ private void stateCheck(final MessageListener<MESSAGE> listener) {
+ if (listener == null) {
+ throw new IllegalArgumentException("The listener object cannot be null");
+ }
+ if (dataHandler == null) {
+ throw new IllegalStateException("Data handler not initialized");
+ }
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/WebSocketMessageListener.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/WebSocketMessageListener.java
new file mode 100644
index 000000000..aa951b4ec
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/WebSocketMessageListener.java
@@ -0,0 +1,58 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.impl.ws;
+
+import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.RawMessageBlock;
+
+/**
+ * The listener interface for receiving webSocketMessage events. The class that is interested in processing a
+ * webSocketMessage event implements this interface, and the object created with that class is registered with a
+ * component using the component's addWebSocketMessageListener method. When the webSocketMessage event occurs, that
+ * object's appropriate method is invoked.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ * @param <MESSAGE> the generic type
+ * @see RawMessageBlock
+ */
+public interface WebSocketMessageListener<MESSAGE> extends MessageListener<MESSAGE>, Runnable {
+
+ /**
+ * This method is called by the class with which this message listener has been registered.
+ *
+ * @param incomingData the data forwarded by the message reception class
+ */
+ void onMessage(RawMessageBlock incomingData);
+
+ /**
+ * Register a data forwarder to which messages coming in on the web socket will be forwarded.
+ *
+ * @param listener The listener to register
+ */
+ void registerDataForwarder(MessageListener<MESSAGE> listener);
+
+ /**
+ * Unregister a data forwarder that was previously registered on the web socket listener.
+ *
+ * @param listener The listener to unregister
+ */
+ void unRegisterDataForwarder(MessageListener<MESSAGE> listener);
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/InternalMessageBusClient.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/InternalMessageBusClient.java
new file mode 100644
index 000000000..9f7f89d8c
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/InternalMessageBusClient.java
@@ -0,0 +1,131 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.client;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+
+import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.RawMessageHandler;
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlockHandler;
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.RawMessageBlock;
+import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The Class InternalMessageBusClient handles the client side of a web socket and handles the callback mechanism used to
+ * receive messages on the web socket.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ * @param <MESSAGE> the generic type of message being handled
+ */
+abstract class InternalMessageBusClient<MESSAGE> extends WebSocketClientImpl {
+ private static final int THREAD_FACTORY_STACK_SIZE = 256;
+
+ // The logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(InternalMessageBusClient.class);
+
+ // Name of the event bus.
+ private static final String RAW_EVENT_BUS = "Raw-Event-Bus";
+
+ // This instance handles the raw data received from the web socket
+ private final RawMessageHandler<MESSAGE> rawMessageHandler = new RawMessageHandler<>();
+
+ // The message block handler to which to pass messages coming in on this client
+ private MessageBlockHandler<MESSAGE> messageBlockHandler = null;
+
+ // The raw message handler uses a thread to process incoming events off a queue, this class owns and controls that
+ // thread. These fields hold the thread and
+ // the thread factory for creating threads.
+ private ApplicationThreadFactory tFactory =
+ new ApplicationThreadFactory("ws-client-thread", THREAD_FACTORY_STACK_SIZE);
+ private Thread forwarderThread = null;
+
+ /**
+ * Construct the class and start the forwarding thread for received messages.
+ *
+ * @param serverUri the server URI to connect to
+ */
+ InternalMessageBusClient(final URI serverUri) {
+ // Call the super class to create the web socket
+ super(serverUri);
+ LOGGER.entry(serverUri.toString());
+
+ // Create the data handler for forwarding messages
+ messageBlockHandler = new MessageBlockHandler<>(RAW_EVENT_BUS);
+ messageBlockHandler.registerMessageHandler(rawMessageHandler);
+
+ // Create the thread that manages the queue in the data handler
+ forwarderThread = tFactory.newThread(rawMessageHandler);
+ forwarderThread.start();
+
+ LOGGER.exit();
+ }
+
+ /**
+ * Callback for binary messages received from the remote host.
+ *
+ * @param rawMessage the received raw message
+ * @see org.java_websocket.client.WebSocketClient#onMessage(java.nio.ByteBuffer)
+ */
+ @Override
+ public void onMessage(final ByteBuffer rawMessage) {
+ // Post the message to the data handler for forwarding to its listeners
+ messageBlockHandler.post(new RawMessageBlock(rawMessage, null));
+ }
+
+ /**
+ * Callback for binary messages received from the remote host.
+ *
+ * @param stringMessage the string message
+ * @see org.java_websocket.client.WebSocketClient#onMessage(java.lang.String)
+ */
+ @Override
+ public final void onMessage(final String stringMessage) {
+ messageBlockHandler.post(stringMessage);
+ }
+
+ /**
+ * Register a subscriber class to the raw message handler.
+ *
+ * @param listener a simple class, that listens for the events from Event
+ */
+ public void addMessageListener(final MessageListener<MESSAGE> listener) {
+ rawMessageHandler.registerDataForwarder(listener);
+ }
+
+ /**
+ * Removes the message listener.
+ *
+ * @param listener the listener
+ */
+ public void removeMessageListener(final MessageListener<MESSAGE> listener) {
+ rawMessageHandler.unRegisterDataForwarder(listener);
+ }
+
+ /**
+ * Stop the thread handling message forwarding.
+ */
+ protected void stopListener() {
+ rawMessageHandler.shutdown();
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java
new file mode 100644
index 000000000..4a756d6f0
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java
@@ -0,0 +1,162 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.client;
+
+import java.net.URI;
+
+import org.java_websocket.WebSocket;
+import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
+import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
+import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
+import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
+
+/**
+ * The Class MessagingClient is the class that wraps web socket handling, message sending, and message reception on the
+ * client side of a web socket in Apex.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ * @param <MESSAGE> the generic type
+ */
+public class MessagingClient<MESSAGE> extends InternalMessageBusClient<MESSAGE> implements MessagingService<MESSAGE> {
+ // The length of time to wait for a connection to a web socket server before aborting
+ private static final int CONNECTION_TIMEOUT_TIME_MS = 3000;
+
+ // The length of time to wait before checking if a connection to a web socket server has worked or not
+ private static final int CONNECTION_TRY_INTERVAL_MS = 100;
+
+ /**
+ * Constructor of this class, uses its {@link InternalMessageBusClient} superclass to set up the web socket and
+ * handle incoming message forwarding.
+ *
+ * @param serverUri The URI of the service
+ */
+ public MessagingClient(final URI serverUri) {
+ // Call the super class to create the web socket and set up received message forwarding
+ super(serverUri);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#stopConnection()
+ */
+ @Override
+ public void stopConnection() {
+ // Stop message reception in the super class
+ super.stopListener();
+
+ // Close the web socket
+ final WebSocket connection = super.getConnection();
+ if (connection != null && connection.isOpen()) {
+ connection.closeConnection(0, "");
+ }
+ this.close();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#startConnection()
+ */
+ @Override
+ public void startConnection() {
+ // Open the web socket
+ final WebSocket connection = super.getConnection();
+ if (connection != null && !connection.isOpen()) {
+ connect();
+ }
+
+ if (!waitforConnection(connection)) {
+ throw new IllegalStateException("Could not connect to the server");
+ }
+ }
+
+ /**
+ * This method waits for the timeout value for the client to connect to the web socket server.
+ *
+ * @param connection the connection to wait on
+ * @return true, if successful
+ */
+ private boolean waitforConnection(final WebSocket connection) {
+ // The total time we have before timeout
+ int timeoutMSCounter = CONNECTION_TIMEOUT_TIME_MS;
+
+ // Check the connection state
+ do {
+ switch (connection.getReadyState()) {
+ case NOT_YET_CONNECTED:
+ case CONNECTING:
+ case CLOSING:
+ // Not connected yet so wait for the try interval
+ ThreadUtilities.sleep(CONNECTION_TRY_INTERVAL_MS);
+ timeoutMSCounter -= CONNECTION_TRY_INTERVAL_MS;
+ break;
+ case OPEN:
+ // Connection is open, happy days
+ return true;
+ case CLOSED:
+ // Connection is closed, bah
+ return false;
+ default:
+ break;
+ }
+ }
+ // While the timeout value has not expired
+ while (timeoutMSCounter > 0);
+
+ // We have timed out
+ return false;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex.core.
+ * infrastructure. messaging.MessageHolder)
+ */
+ @Override
+ public void send(final MessageHolder<MESSAGE> commands) {
+ // Get the connection and send the message
+ final WebSocket connection = super.getConnection();
+ connection.send(MessagingUtils.serializeObject(commands));
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String)
+ */
+ @Override
+ public void send(final String messageString) {
+ final WebSocket connection = super.getConnection();
+ connection.send(messageString);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#isStarted()
+ */
+ @Override
+ public boolean isStarted() {
+ return getConnection().isOpen();
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/WebSocketClientImpl.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/WebSocketClientImpl.java
new file mode 100644
index 000000000..b2e0953c7
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/WebSocketClientImpl.java
@@ -0,0 +1,83 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.client;
+
+import java.net.URI;
+
+import org.java_websocket.client.WebSocketClient;
+import org.java_websocket.handshake.ServerHandshake;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * This class implements {@link WebSocketClient} specific methods in order to act as a Java Web Socket client.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ */
+abstract class WebSocketClientImpl extends WebSocketClient {
+ // The logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(WebSocketClientImpl.class);
+
+ /**
+ * Constructs a WebSocketClient instance and sets it to the connect to the specified URI. The channel does not
+ * attempt to connect automatically. You must call {@link connect} first to initiate the socket connection.
+ *
+ * @param serverUri the URI of the web socket server to connect to
+ */
+ WebSocketClientImpl(final URI serverUri) {
+ super(serverUri);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.java_websocket.client.WebSocketClient#onOpen(org.java_websocket.handshake.ServerHandshake)
+ */
+ @Override
+ public void onOpen(final ServerHandshake handshakedata) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Connection opened to server {} --> {}", this.getURI(), handshakedata.getHttpStatusMessage());
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.java_websocket.client.WebSocketClient#onClose(int, java.lang.String, boolean)
+ */
+ @Override
+ public void onClose(final int code, final String reason, final boolean remote) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Connection closed to server {} --> code \"{}\", reason \"{}\"", this.getURI(), code, reason);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.java_websocket.client.WebSocketClient#onError(java.lang.Exception)
+ */
+ @Override
+ public void onError(final Exception ex) {
+ LOGGER.info("Failed to make a connection to the server {} ", getURI());
+ LOGGER.catching(ex);
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/package-info.java
new file mode 100644
index 000000000..d5344fe14
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Provides the client side of messaging over web sockets.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.client;
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlock.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlock.java
new file mode 100644
index 000000000..70b1d2c3a
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlock.java
@@ -0,0 +1,70 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock;
+
+import java.util.List;
+
+import org.java_websocket.WebSocket;
+
+/**
+ * This class encapsulate messages and the web socket on which they are handled.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ * @param <MESSAGE> the generic type of message being handled
+ */
+public final class MessageBlock<MESSAGE> {
+
+ // List of Messages received on a web socket
+ private final List<MESSAGE> messages;
+
+ // The web socket on which the messages are handled
+ private final WebSocket webSocket;
+
+ /**
+ * Instantiates a new message block.
+ *
+ * @param messages the messages in the message block
+ * @param webSocket the web socket used to handle the message block
+ */
+ public MessageBlock(final List<MESSAGE> messages, final WebSocket webSocket) {
+ this.messages = messages;
+ this.webSocket = webSocket;
+ }
+
+ /**
+ * Gets the messages.
+ *
+ * @return the messages
+ */
+ public List<MESSAGE> getMessages() {
+ return messages;
+ }
+
+ /**
+ * Gets the web socket.
+ *
+ * @return the web socket
+ */
+ public WebSocket getConnection() {
+ return webSocket;
+ }
+
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlockHandler.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlockHandler.java
new file mode 100644
index 000000000..4265718db
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlockHandler.java
@@ -0,0 +1,128 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock;
+
+import com.google.common.eventbus.EventBus;
+
+import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * This class is used to pass messages received on a Java web socket to listening application class instances using an
+ * event bus.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ * @param <MESSAGE> the generic type
+ */
+public class MessageBlockHandler<MESSAGE> {
+ // Logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessageBlockHandler.class);
+
+ /**
+ * This event bus will forward the events to all of its subscribers.
+ */
+ private EventBus eventBus = null;
+
+ /**
+ * Instantiates a new data handler.
+ *
+ * @param eventBusName the name of the event bus for this message block handler
+ */
+ public MessageBlockHandler(final String eventBusName) {
+ eventBus = new EventBus(eventBusName);
+ LOGGER.trace("message bus {} created ", eventBusName);
+ }
+
+ /**
+ * Post a raw message block on the data handler event bus of this class.
+ *
+ * @param rawMessageBlock the block containing raw messages
+ */
+ public void post(final RawMessageBlock rawMessageBlock) {
+ if (rawMessageBlock.getMessage() != null) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("new raw message recieved from {}", rawMessageBlock.getConn() == null ? "server"
+ : rawMessageBlock.getConn().getRemoteSocketAddress().getHostName());
+ }
+ eventBus.post(rawMessageBlock);
+ }
+ }
+
+ /**
+ * Post a block of typed messages on the data handler event bus of this class.
+ *
+ * @param messageBlock the block containing typed messages
+ */
+ public void post(final MessageBlock<MESSAGE> messageBlock) {
+ if (messageBlock.getMessages() != null) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("new data message recieved from {}", messageBlock.getConnection() == null ? "server"
+ : messageBlock.getConnection().getRemoteSocketAddress().getHostName());
+ }
+ eventBus.post(messageBlock);
+ }
+ }
+
+ /**
+ * Post a string message on the data handler event bus of this class.
+ *
+ * @param messageString the string message
+ */
+ public void post(final String messageString) {
+ if (messageString != null) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("new string message recieved from server: " + messageString);
+ }
+ eventBus.post(messageString);
+ }
+ }
+
+ /**
+ * Register a listener to event bus.
+ *
+ * @param listener is an instance of WebSocketMessageListener
+ */
+ public void registerMessageHandler(final MessageListener<MESSAGE> listener) {
+ LOGGER.entry(listener);
+ if (listener == null) {
+ throw new IllegalArgumentException("listener object cannot be null");
+ }
+ eventBus.register(listener);
+ LOGGER.debug("message listener {} is registered with forwarder", listener);
+ LOGGER.exit();
+ }
+
+ /**
+ * Remove the listener subscribed to the event bus.
+ *
+ * @param listener the listener
+ */
+ public void unRegisterMessageHandler(final MessageListener<MESSAGE> listener) {
+ if (listener == null) {
+ throw new IllegalArgumentException("listener object cannot be null");
+ }
+ LOGGER.entry(listener);
+ eventBus.unregister(listener);
+ LOGGER.trace(" message listener {} unregistered from forwarder", listener);
+ LOGGER.exit();
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/RawMessageBlock.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/RawMessageBlock.java
new file mode 100644
index 000000000..3fa25b56b
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/RawMessageBlock.java
@@ -0,0 +1,67 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock;
+
+import java.nio.ByteBuffer;
+
+import org.java_websocket.WebSocket;
+
+/**
+ * A container for a raw message block and the connection on which it is handled.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ */
+public final class RawMessageBlock {
+ // The raw message
+ private final ByteBuffer message;
+
+ // The web socket on which the message is handled
+ private final WebSocket webSocket;
+
+ /**
+ * Constructor, instantiate the bean.
+ *
+ * @param message {@link ByteBuffer} message from the web socket
+ * @param webSocket {@link WebSocket} the web socket on which the message is handled
+ */
+ public RawMessageBlock(final ByteBuffer message, final WebSocket webSocket) {
+ this.message = message;
+ this.webSocket = webSocket;
+ }
+
+ /**
+ * A getter method for message.
+ *
+ * @return the message
+ */
+ public ByteBuffer getMessage() {
+ return message;
+ }
+
+ /**
+ * A getter method for the web socket.
+ *
+ * @return the web socket
+ */
+ public WebSocket getConn() {
+ return webSocket;
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/package-info.java
new file mode 100644
index 000000000..c01c88aed
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Pass blocks of messages on Web Sockets to clients using an event bus.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock;
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/package-info.java
new file mode 100644
index 000000000..c8811817b
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Provides a Web Service implementation of the Messaging interfaces.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.impl.ws;
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/InternalMessageBusServer.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/InternalMessageBusServer.java
new file mode 100644
index 000000000..8e65bbf98
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/InternalMessageBusServer.java
@@ -0,0 +1,134 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.server;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+
+import org.java_websocket.WebSocket;
+import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
+import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.RawMessageHandler;
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlockHandler;
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.RawMessageBlock;
+import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The Class InternalMessageBusServer handles the server side of a web socket and handles the callback mechanism used to
+ * receive messages on the web socket.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ * @param <MESSAGE> the generic type
+ */
+abstract class InternalMessageBusServer<MESSAGE> extends WebSocketServerImpl implements MessagingService<MESSAGE> {
+ // Logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(InternalMessageBusServer.class);
+
+ private static final int THREAD_FACTORY_STACK_SIZE = 256;
+
+ // Name of the event bus.
+ private static final String RAW_EVENT_BUS = "Raw-Event-Bus";
+
+ // This instance handles the raw data received from the web socket
+ private final RawMessageHandler<MESSAGE> rawMessageHandler = new RawMessageHandler<>();
+
+ // The message block handler to which to pass messages coming in on this client
+ private MessageBlockHandler<MESSAGE> messageBlockHandler = null;
+
+ // The raw message handler uses a thread to process incoming events off a queue, this class owns and controls that
+ // thread. These fields hold the thread and
+ // the thread factory for creating threads.
+ private ApplicationThreadFactory tFactory =
+ new ApplicationThreadFactory("ws-server-thread", THREAD_FACTORY_STACK_SIZE);
+ private Thread forwarderThread = null;
+
+ /**
+ * Construct the class and start the forwarding thread for received messages.
+ *
+ * @param address the address of the server machine
+ */
+ protected InternalMessageBusServer(final InetSocketAddress address) {
+ // Call the super class to create the web socket
+ super(address);
+ LOGGER.entry(address.getAddress().getHostAddress() + ":" + address.getPort());
+
+ // Create the data handler for forwarding messages
+ messageBlockHandler = new MessageBlockHandler<>(RAW_EVENT_BUS);
+ messageBlockHandler.registerMessageHandler(rawMessageHandler);
+
+ // Create the thread that manages the queue in the data handler
+ forwarderThread = tFactory.newThread(rawMessageHandler);
+ forwarderThread.start();
+
+ LOGGER.exit();
+ }
+
+ /**
+ * Callback for binary messages received from the remote host.
+ *
+ * @param webSocket the web socket on which the raw message was received
+ * @param rawMessage the received raw message
+ * @see #onMessage(WebSocket, String)
+ */
+ @Override
+ public void onMessage(final WebSocket webSocket, final ByteBuffer rawMessage) {
+ messageBlockHandler.post(new RawMessageBlock(rawMessage, webSocket));
+ }
+
+ /**
+ * Register a subscriber class to the raw message handler.
+ *
+ * @param subscriber the subscriber
+ */
+ @Override
+ public void addMessageListener(final MessageListener<MESSAGE> subscriber) {
+ rawMessageHandler.registerDataForwarder(subscriber);
+ }
+
+ /**
+ * Removes the message listener.
+ *
+ * @param subscriber the subscriber
+ */
+ @Override
+ public void removeMessageListener(final MessageListener<MESSAGE> subscriber) {
+ rawMessageHandler.unRegisterDataForwarder(subscriber);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.java_websocket.server.WebSocketServer#onMessage(org.java_websocket.WebSocket, java.lang.String)
+ */
+ @Override
+ public void onMessage(final WebSocket webSocket, final String stringMessage) {
+ messageBlockHandler.post(stringMessage);
+ }
+
+ /**
+ * Stop the thread handling message forwarding.
+ */
+ protected void stopListener() {
+ rawMessageHandler.shutdown();
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java
new file mode 100644
index 000000000..fc401576f
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java
@@ -0,0 +1,161 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.server;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+
+import org.java_websocket.WebSocket;
+import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder;
+import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * A messaging server implementation using web socket.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ * @param <MESSAGE> the generic type of message being passed
+ */
+public class MessageServerImpl<MESSAGE> extends InternalMessageBusServer<MESSAGE> {
+ // The logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessageServerImpl.class);
+
+ // The Web Socket protocol for URIs and URLs
+ private static final String PROTOCOL = "ws://";
+
+ // URI of this server
+ private final String connectionURI;
+
+ // Indicates if the web socket server is started or not
+ private boolean isStarted = false;
+
+ /**
+ * Instantiates a new web socket messaging server for Apex.
+ *
+ * @param address the address of the server machine on which to start the server
+ */
+ public MessageServerImpl(final InetSocketAddress address) {
+ // Call the super class to create the web socket and set up received message forwarding
+ super(address);
+ LOGGER.entry(address);
+
+ // Compose the Web Socket URI
+ connectionURI = PROTOCOL + address.getHostString() + ":" + address.getPort();
+ LOGGER.debug("Server connection URI: {}", connectionURI);
+
+ LOGGER.exit();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.java_websocket.server.WebSocketServer#start()
+ */
+ @Override
+ public void startConnection() {
+ // Start reception of connections on the web socket
+ start();
+ isStarted = true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.java_websocket.server.WebSocketServer#stop()
+ */
+ @Override
+ public void stopConnection() {
+ // Stop message listening using our super class
+ stopListener();
+
+ // Stop the web socket server
+ try {
+ // Close all connections on this web socket server
+ for (final WebSocket connection : connections()) {
+ connection.closeConnection(0, "");
+ }
+ stop();
+ } catch (final IOException ioe) {
+ LOGGER.catching(ioe);
+ } catch (final InterruptedException e) {
+ // This can happen in normal operation so ignore
+ }
+ isStarted = false;
+ }
+
+ /**
+ * This method returns the current connection URI , if the server started otherwise it throws
+ * {@link IllegalStateException}.
+ *
+ * @return connection URI
+ */
+ public String getConnectionURI() {
+ if (connectionURI == null) {
+ throw new IllegalStateException("URI not set - The server is not started");
+ }
+ return connectionURI;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex.core.
+ * infrastructure. messaging.MessageHolder)
+ */
+ @Override
+ public void send(final MessageHolder<MESSAGE> message) {
+ // Send the incoming message to all clients connected to this web socket
+ final Collection<WebSocket> connections = connections();
+ for (final WebSocket webSocket : connections) {
+ webSocket.send(MessagingUtils.serializeObject(message));
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String)
+ */
+ @Override
+ public void send(final String messageString) {
+ final Collection<WebSocket> connections = connections();
+ for (final WebSocket webSocket : connections) {
+ webSocket.send(messageString);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#isStarted()
+ */
+ @Override
+ public boolean isStarted() {
+ return isStarted;
+ }
+
+ @Override
+ public void onStart() {
+ LOGGER.debug("started deployment server on URI: {}", connectionURI);
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/WebSocketServerImpl.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/WebSocketServerImpl.java
new file mode 100644
index 000000000..26acfe70c
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/WebSocketServerImpl.java
@@ -0,0 +1,89 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.server;
+
+import java.net.InetSocketAddress;
+
+import org.java_websocket.WebSocket;
+import org.java_websocket.handshake.ClientHandshake;
+import org.java_websocket.server.WebSocketServer;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * This class is the web socket server specific implementation for Apex.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ */
+abstract class WebSocketServerImpl extends WebSocketServer {
+ // The logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessageServerImpl.class);
+
+ /**
+ * Constructor of this class.
+ *
+ * @param address host address of the local machine.
+ */
+ protected WebSocketServerImpl(final InetSocketAddress address) {
+ super(address);
+ LOGGER.entry(address.getAddress().getHostAddress() + ":" + address.getPort());
+ LOGGER.exit();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.java_websocket.server.WebSocketServer#onOpen(org.java_websocket.WebSocket ,
+ * org.java_websocket.handshake.ClientHandshake)
+ */
+ @Override
+ public void onOpen(final WebSocket conn, final ClientHandshake handshake) {
+ LOGGER.entry(conn, handshake);
+ LOGGER.debug("A client connection opened from machine {}.",
+ conn.getRemoteSocketAddress().getAddress().getHostAddress());
+ LOGGER.exit();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.java_websocket.server.WebSocketServer#onClose(org.java_websocket. WebSocket, int, java.lang.String,
+ * boolean)
+ */
+ @Override
+ public void onClose(final WebSocket conn, final int code, final String reason, final boolean remote) {
+ LOGGER.entry(conn, code, remote);
+ LOGGER.debug("A client connection from machine {} closing with code {}.",
+ conn.getRemoteSocketAddress().getAddress().getHostAddress(), code);
+ LOGGER.exit();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.java_websocket.server.WebSocketServer#onError(org.java_websocket.WebSocket, java.lang.Exception)
+ */
+ @Override
+ public void onError(final WebSocket conn, final Exception ex) {
+ // some errors like port binding failed may not be assignable to a specific web socket
+ LOGGER.error("server error occurred", ex);
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/package-info.java
new file mode 100644
index 000000000..0a7235b05
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Provides the server side of messaging over web sockets.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.server;
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/package-info.java
new file mode 100644
index 000000000..adb17da04
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Provides support for passing messages as POJOs and as strings over Web Sockets.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging;
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageClient.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageClient.java
new file mode 100644
index 000000000..00ade8047
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageClient.java
@@ -0,0 +1,147 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.stringmessaging;
+
+import com.google.common.eventbus.Subscribe;
+
+import java.net.URI;
+
+import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
+import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
+import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
+import org.onap.policy.apex.core.infrastructure.messaging.MessagingServiceFactory;
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * This class uses a web socket client to send and receive strings over a web socket.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class WSStringMessageClient implements WSStringMessager {
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(WSStringMessageClient.class);
+
+ // Message service factory and the message service itself
+ private final MessagingServiceFactory<String> factory = new MessagingServiceFactory<>();
+ private MessagingService<String> service = null;
+
+ // The listener to use for reception of strings
+ private WSStringMessageListener wsStringMessageListener;
+
+ // Address of the server
+ private final String host;
+ private final int port;
+ private String uriString;
+
+ /**
+ * Constructor, define the host and port of the server to connect to.
+ *
+ * @param host the host of the server
+ * @param port the port of the server
+ */
+ public WSStringMessageClient(final String host, final int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageSender#start(org.onap.policy.
+ * apex. core.infrastructure.messaging. stringmessaging.WSStringMessageListener)
+ */
+ @Override
+ public void start(final WSStringMessageListener newWsStringMessageListener) throws MessagingException {
+ this.wsStringMessageListener = newWsStringMessageListener;
+
+ uriString = "ws://" + host + ":" + port;
+ LOGGER.entry("web socket event consumer client to \"" + uriString + "\" starting . . .");
+
+ try {
+ service = factory.createClient(new URI(uriString));
+ service.addMessageListener(new WSStringMessageClientListener());
+ service.startConnection();
+ } catch (final Exception e) {
+ LOGGER.warn("web socket event consumer client to \"" + uriString + "\" start failed", e);
+ throw new MessagingException("web socket event consumer client to \"" + uriString + "\" start failed", e);
+ }
+
+ LOGGER.exit("web socket event consumer client to \"" + uriString + "\" started");
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageSender#stop()
+ */
+ @Override
+ public void stop() {
+ LOGGER.entry("web socket event consumer client to \"" + uriString + "\" stopping . . .");
+ service.stopConnection();
+ LOGGER.exit("web socket event consumer client to \"" + uriString + "\" stopped");
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageSender#sendString(java.lang.
+ * String)
+ */
+ @Override
+ public void sendString(final String stringMessage) {
+ service.send(stringMessage);
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("message sent to server: " + stringMessage);
+ }
+ }
+
+ /**
+ * The Class WSStringMessageClientListener.
+ */
+ private class WSStringMessageClientListener implements MessageListener<String> {
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap.policy.apex.core.
+ * infrastructure.messaging.impl.ws.messageblock. MessageBlock)
+ */
+ @Subscribe
+ @Override
+ public void onMessage(final MessageBlock<String> messageBlock) {
+ throw new UnsupportedOperationException("raw messages are not supported on string message clients");
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.String)
+ */
+ @Subscribe
+ @Override
+ public void onMessage(final String messageString) {
+ wsStringMessageListener.receiveString(messageString);
+ }
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageListener.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageListener.java
new file mode 100644
index 000000000..e524b43d7
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageListener.java
@@ -0,0 +1,36 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.stringmessaging;
+
+/**
+ * This interface is used to call back the owner of a String Web socket message server or client.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public interface WSStringMessageListener {
+
+ /**
+ * Receive a string coming off a web socket.
+ *
+ * @param stringMessage the string message
+ */
+ void receiveString(String stringMessage);
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageServer.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageServer.java
new file mode 100644
index 000000000..4da478f6a
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageServer.java
@@ -0,0 +1,150 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.stringmessaging;
+
+import com.google.common.eventbus.Subscribe;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.onap.policy.apex.core.infrastructure.messaging.MessageListener;
+import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
+import org.onap.policy.apex.core.infrastructure.messaging.MessagingService;
+import org.onap.policy.apex.core.infrastructure.messaging.MessagingServiceFactory;
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
+import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * This class runs a web socket server for sending and receiving of strings over a web socket.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class WSStringMessageServer implements WSStringMessager {
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(WSStringMessageServer.class);
+
+ // Message service factory and the message service itself
+ private final MessagingServiceFactory<String> factory = new MessagingServiceFactory<>();
+ private MessagingService<String> service = null;
+
+ // The listener to use for reception of strings
+ private WSStringMessageListener wsStringMessageListener;
+
+ // Address of the server
+ private final int port;
+
+ /**
+ * Constructor, define the port of the server.
+ *
+ * @param port the port of the server
+ */
+ public WSStringMessageServer(final int port) {
+ this.port = port;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageSender#start(org.onap.policy.
+ * apex. core.infrastructure.messaging. stringmessaging.WSStringMessageListener)
+ */
+ @Override
+ public void start(final WSStringMessageListener newWsStringMessageListener) throws MessagingException {
+ this.wsStringMessageListener = newWsStringMessageListener;
+
+ LOGGER.entry("web socket event consumer server starting . . .");
+
+ try {
+ final InetAddress addrLan = MessagingUtils.getLocalHostLANAddress();
+ LOGGER.debug("web socket string message server LAN address=" + addrLan.getHostAddress());
+ final InetAddress addr = InetAddress.getLocalHost();
+ LOGGER.debug("web socket string message server host address=" + addr.getHostAddress());
+
+ service = factory.createServer(new InetSocketAddress(port));
+ service.addMessageListener(new WSStringMessageServerListener());
+
+ service.startConnection();
+ } catch (final Exception e) {
+ LOGGER.warn("web socket string message server start failed", e);
+ throw new MessagingException("web socket string message start failed", e);
+ }
+
+ LOGGER.exit("web socket string message server started");
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageSender#stop()
+ */
+ @Override
+ public void stop() {
+ LOGGER.entry("web socket string message server stopping . . .");
+ service.stopConnection();
+ LOGGER.exit("web socket string message server stopped");
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageSender#sendString(java.lang.
+ * String)
+ */
+ @Override
+ public void sendString(final String stringMessage) {
+ service.send(stringMessage);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("server sent message: " + stringMessage);
+ }
+ }
+
+ /**
+ * The listener for strings coming into the server.
+ */
+ private class WSStringMessageServerListener implements MessageListener<String> {
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap.policy.apex.core.
+ * infrastructure.messaging.impl.ws.messageblock. MessageBlock)
+ */
+ @Subscribe
+ @Override
+ public void onMessage(final MessageBlock<String> messageBlock) {
+ throw new UnsupportedOperationException("raw messages are not supported on string message clients");
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.String)
+ */
+ @Subscribe
+ @Override
+ public void onMessage(final String messageString) {
+ wsStringMessageListener.receiveString(messageString);
+ }
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessager.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessager.java
new file mode 100644
index 000000000..a2781e932
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessager.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.stringmessaging;
+
+import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
+
+/**
+ * This interface is used to call a String Web socket message server or client to send a string.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public interface WSStringMessager {
+
+ /**
+ * Start the string message sender.
+ *
+ * @param wsStringMessageListener the listener to use for listening for string messages
+ * @throws MessagingException the messaging exception
+ */
+ void start(WSStringMessageListener wsStringMessageListener) throws MessagingException;
+
+ /**
+ * Stop the string messaging sender.
+ */
+ void stop();
+
+ /**
+ * Send a string on a web socket.
+ *
+ * @param stringMessage the string message to send
+ */
+ void sendString(String stringMessage);
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/package-info.java
new file mode 100644
index 000000000..a8e679c70
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Provides string messaging over Web Sockets.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.stringmessaging;
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java
new file mode 100644
index 000000000..d15f86c8a
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java
@@ -0,0 +1,260 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.Enumeration;
+
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The Class MessagingUtils is a class with static methods used in IPC messaging for finding free ports, translating
+ * host names to addresses, serializing objects and flushing object streams.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ */
+public final class MessagingUtils {
+ // The port number of the lowest user port, ports 0-1023 are system ports
+ private static final int LOWEST_USER_PORT = 1024;
+
+ // Logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessagingUtils.class);
+
+ /**
+ * Private constructor used to prevent sub class instantiation.
+ */
+ private MessagingUtils() {}
+
+ /**
+ * This method searches the availability of the port, if the requested port not available, this method will throw an
+ * exception.
+ *
+ * @param port the port to check
+ * @return the port verified as being free
+ * @throws RuntimeException on port allocation errors
+ */
+ public static int checkPort(final int port) {
+ LOGGER.entry("Checking availability of port {}", port);
+
+ Socket s = null;
+ try {
+ // Try to connect to the port, if we can connect then the port is occupied
+ s = new Socket("localhost", port);
+ LOGGER.debug("Port {} is not available", port);
+
+ throw new RuntimeException("could not allocate requested port: " + port);
+ } catch (final IOException e) {
+ // We found a free port
+ LOGGER.debug("Port {} is available ", port);
+ return port;
+ } finally {
+ // Close the socket used to check if the port was free
+ if (s != null) {
+ try {
+ s.close();
+ } catch (final IOException e) {
+ LOGGER.catching(e);
+ LOGGER.warn("could not allocate requested port " + port, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * This method searches the availability of the port, if the requested port not available,this method will increment
+ * the port number and check the availability of that port, this process will continue until it find port available.
+ *
+ * @param port the first port to check
+ * @return the port that was found
+ * @throws RuntimeException on port allocation errors
+ */
+ public static int findPort(final int port) {
+ LOGGER.entry("Checking availability of port {}", port);
+
+ Socket s = null;
+ try {
+ // Try to connect to the port, if we can connect then the port is occupied
+ s = new Socket("localhost", port);
+ LOGGER.debug("Port {} is not available", port);
+
+ // Recurse and try the next port
+ return findPort(port + 1);
+ } catch (final IOException e) {
+ // We found a free port
+ LOGGER.debug("Port {} is available ", port);
+ return port;
+ } finally {
+ // Close the socket used to check if the port was free
+ if (s != null) {
+ try {
+ s.close();
+ } catch (final IOException e) {
+ LOGGER.catching(e);
+ LOGGER.warn("could not allocate requested port " + port, e);
+ throw new RuntimeException("could not allocate requested port " + port, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns the local host address.
+ *
+ * @return the local host address
+ */
+ public static InetAddress getHost() {
+ try {
+ return InetAddress.getLocalHost();
+ } catch (final UnknownHostException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * This method searches the availability of the port, if the requested port not available,this method will increment
+ * the port number and check the availability, this process will continue until it find port available.
+ *
+ * @param port the first port to check
+ * @return the port that was found
+ * @throws RuntimeException on port allocation errors
+ */
+ public static int allocateAddress(final int port) {
+ if (port < LOWEST_USER_PORT) {
+ throw new IllegalArgumentException("The port " + port + " is already in use");
+ }
+ return MessagingUtils.findPort(port);
+ }
+
+ /**
+ * Get an Internet Address for the local host.
+ *
+ * @return an Internet address
+ * @throws UnknownHostException if the address of the local host cannot be found
+ */
+ public static InetAddress getLocalHostLANAddress() throws UnknownHostException {
+ try {
+ InetAddress candidateAddress = null;
+ // Iterate all NICs (network interface cards)...
+ for (final Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces(); ifaces
+ .hasMoreElements();) {
+ final NetworkInterface iface = ifaces.nextElement();
+ // Iterate all IP addresses assigned to each card...
+ for (final Enumeration<InetAddress> inetAddrs = iface.getInetAddresses(); inetAddrs
+ .hasMoreElements();) {
+ final InetAddress inetAddr = inetAddrs.nextElement();
+ if (!inetAddr.isLoopbackAddress()) {
+
+ if (inetAddr.isSiteLocalAddress()) {
+ // Found non-loopback site-local address. Return it
+ // immediately...
+ return inetAddr;
+ } else if (candidateAddress == null) {
+ // Found non-loopback address, but not
+ // necessarily site-local.
+ // Store it as a candidate to be returned if
+ // site-local address is not subsequently
+ // found...
+ candidateAddress = inetAddr;
+ // Note that we don't repeatedly assign
+ // non-loopback non-site-local addresses as
+ // candidates,
+ // only the first. For subsequent iterations,
+ // candidate will be non-null.
+ }
+ }
+ }
+ }
+ if (candidateAddress != null) {
+ // We did not find a site-local address, but we found some other
+ // non-loopback address.
+ // Server might have a non-site-local address assigned to its
+ // NIC (or it might be running
+ // IPv6 which deprecates the "site-local" concept).
+ // Return this non-loopback candidate address...
+ return candidateAddress;
+ }
+ // At this point, we did not find a non-loopback address.
+ // Fall back to returning whatever InetAddress.getLocalHost()
+ // returns...
+ final InetAddress jdkSuppliedAddress = InetAddress.getLocalHost();
+ if (jdkSuppliedAddress == null) {
+ throw new UnknownHostException("The JDK InetAddress.getLocalHost() method unexpectedly returned null.");
+ }
+ return jdkSuppliedAddress;
+ } catch (final Exception e) {
+ final UnknownHostException unknownHostException =
+ new UnknownHostException("Failed to determine LAN address: " + e);
+ unknownHostException.initCause(e);
+ throw unknownHostException;
+ }
+ }
+
+ /**
+ * This method serializes the message holder objects.
+ *
+ * @param object the object
+ * @return byte[]
+ */
+ public static byte[] serializeObject(final Object object) {
+ LOGGER.entry(object.getClass().getName());
+ final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ ObjectOutputStream oos = null;
+ try {
+ oos = new ObjectOutputStream(bytesOut);
+ oos.writeObject(object);
+ } catch (final IOException e) {
+ LOGGER.warn("error on object serialization", e);
+ } finally {
+ flushAndClose(oos, bytesOut);
+ }
+ final byte[] bytes = bytesOut.toByteArray();
+ return bytes;
+ }
+
+ /**
+ * Flush and close an object stream and a byte array output stream.
+ *
+ * @param oos the object output stream
+ * @param bytesOut the byte array output stream
+ */
+ private static void flushAndClose(final ObjectOutputStream oos, final ByteArrayOutputStream bytesOut) {
+ try {
+ if (oos != null) {
+ oos.flush();
+ oos.close();
+ }
+ if (bytesOut != null) {
+ bytesOut.close();
+ }
+
+ } catch (final IOException e) {
+ LOGGER.error("Failed to close the Srialization operation");
+ LOGGER.catching(e);
+ }
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/package-info.java
new file mode 100644
index 000000000..ccb12f2dd
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Contains utility classes for messaging using sockets and web sockets.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging.util;
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/package-info.java
new file mode 100644
index 000000000..8bd0a093f
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Provides infrastructure and utility functions for use by other classes and modules in APEX.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+
+package org.onap.policy.apex.core.infrastructure;
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ApplicationThreadFactory.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ApplicationThreadFactory.java
new file mode 100644
index 000000000..45579c7ba
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ApplicationThreadFactory.java
@@ -0,0 +1,142 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.threading;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class provides a thread factory for use by classes that require thread factories to handle concurrent operation.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ */
+public class ApplicationThreadFactory implements ThreadFactory {
+ private static final String HYPHEN = "-";
+ private static final String APPLICATION_NAME = "Apex-";
+ private static final AtomicInteger NEXT_POOL_NUMBER = new AtomicInteger();
+ private final ThreadGroup group;
+ private final AtomicInteger nextThreadNumber = new AtomicInteger();
+ private final String name;
+ private final long stackSize;
+ private final int threadPriority;
+
+ /**
+ * Instantiates a new application thread factory with a default stack size and normal thread priority.
+ *
+ * @param nameLocal the name local
+ */
+ public ApplicationThreadFactory(final String nameLocal) {
+ this(nameLocal, 0);
+ }
+
+ /**
+ * Instantiates a new application thread factory with a default normal thread priority.
+ *
+ * @param nameLocal the name local
+ * @param stackSize the stack size
+ */
+ public ApplicationThreadFactory(final String nameLocal, final long stackSize) {
+ this(nameLocal, stackSize, Thread.NORM_PRIORITY);
+ }
+
+ /**
+ * Instantiates a new application thread factory with a specified thread priority.
+ *
+ * @param nameLocal the name local
+ * @param stackSize the stack size
+ * @param threadPriority the thread priority
+ */
+ public ApplicationThreadFactory(final String nameLocal, final long stackSize, final int threadPriority) {
+ final SecurityManager s = System.getSecurityManager();
+ group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+ name = APPLICATION_NAME + nameLocal + HYPHEN + NEXT_POOL_NUMBER.getAndIncrement();
+ this.stackSize = stackSize;
+ this.threadPriority = threadPriority;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.concurrent.ThreadFactory#newThread(java.lang.Runnable)
+ */
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread thisThread;
+ if (stackSize > 0) {
+ thisThread = new Thread(group, r, name + ':' + nextThreadNumber.getAndIncrement(), stackSize);
+ } else {
+ thisThread = new Thread(group, r, name + ':' + nextThreadNumber.getAndIncrement());
+ }
+ if (thisThread.isDaemon()) {
+ thisThread.setDaemon(false);
+ }
+ thisThread.setPriority(threadPriority);
+
+ return thisThread;
+ }
+
+ /**
+ * Stop group threads.
+ */
+ public void stopGroupThreads() {
+ group.interrupt();
+ group.list();
+
+ }
+
+ /**
+ * Gets the name of the thread factory.
+ *
+ * @return the name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Gets the stack size of the threads created by this thread factory.
+ *
+ * @return the stack size
+ */
+ public long getStackSize() {
+ return stackSize;
+ }
+
+ /**
+ * Gets the thread priority of the threads created by this thread factory.
+ *
+ * @return the thread priority
+ */
+ public int getThreadPriority() {
+ return threadPriority;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return "ApplicationThreadFactory [nextPollNumber=" + NEXT_POOL_NUMBER + ",nextThreadNumber=" + nextThreadNumber
+ + ", name=" + name + ", stackSize=" + stackSize + ", threadPriority=" + threadPriority + "]";
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java
new file mode 100644
index 000000000..56b903f38
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java
@@ -0,0 +1,50 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.threading;
+
+/**
+ * This class is a helper class for carrying out common threading tasks.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public final class ThreadUtilities {
+
+ /**
+ * Private constructor to prevent sub-classing of this class.
+ */
+ private ThreadUtilities() {}
+
+ /**
+ * Sleeps for the specified number of milliseconds, hiding interrupt handling.
+ *
+ * @param milliseconds the milliseconds
+ * @return true, if successful
+ */
+ public static boolean sleep(final long milliseconds) {
+ try {
+ Thread.sleep(milliseconds);
+ } catch (final InterruptedException e) {
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/package-info.java
new file mode 100644
index 000000000..dc0b9ee40
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Provides factories and utility functions for threads.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+
+package org.onap.policy.apex.core.infrastructure.threading;
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/xml/XPathReader.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/xml/XPathReader.java
new file mode 100644
index 000000000..f677e6b4a
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/xml/XPathReader.java
@@ -0,0 +1,115 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.xml;
+
+import java.io.InputStream;
+
+import javax.xml.namespace.QName;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+import org.w3c.dom.Document;
+
+/**
+ * A generic class for applying the XPATH queries on XML files.
+ *
+ * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
+ */
+public class XPathReader {
+ // Logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(XPathReader.class);
+
+ private String xmlFileName = null;
+ private InputStream xmlStream = null;
+ private Document xmlDocument;
+ private XPath xPath;
+
+ /**
+ * Construct Reader for the file passed in.
+ *
+ * @param xmlFileName the xml file name
+ */
+ public XPathReader(final String xmlFileName) {
+ this.xmlFileName = xmlFileName;
+ init();
+ }
+
+ /**
+ * Construct Reader for the stream passed in.
+ *
+ * @param xmlStream a stream of XML
+ */
+ public XPathReader(final InputStream xmlStream) {
+ this.xmlStream = xmlStream;
+ init();
+ }
+
+ /**
+ * Initialise the x-path reader.
+ */
+ private void init() {
+ try {
+ LOGGER.info("Initializing XPath reader");
+
+ // Check if this is operating on a file
+ if (xmlFileName != null) {
+ xmlDocument = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(xmlFileName);
+ }
+ // Check if this is operating on a stream
+ else if (xmlStream != null) {
+ xmlDocument = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(xmlStream);
+
+ }
+ // We have an error
+ else {
+ LOGGER.error("XPath reader not initialized with either a file or a stream");
+ return;
+ }
+
+ xPath = XPathFactory.newInstance().newXPath();
+ LOGGER.info("Initialized XPath reader");
+ } catch (final Exception ex) {
+ LOGGER.error("Error parsing XML file/stream from XPath reading, reason :\n" + ex.getMessage());
+ }
+ }
+
+ /**
+ * Read items from the file using xpath.
+ *
+ * @param expression x-path expression
+ * @param returnType XML node Set
+ * @return last node collected
+ */
+ public Object read(final String expression, final QName returnType) {
+ try {
+ final XPathExpression xPathExpression = xPath.compile(expression);
+ return xPathExpression.evaluate(xmlDocument, returnType);
+ } catch (final XPathExpressionException ex) {
+ LOGGER.error("Failed to read XML file for XPath processing, reason:\n" + ex.getMessage());
+ return null;
+ }
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/xml/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/xml/package-info.java
new file mode 100644
index 000000000..5631bd15c
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/xml/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Provides utility XML classes for use by other classes and modules in APEX.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+
+package org.onap.policy.apex.core.infrastructure.xml;
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassBuilder.java b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassBuilder.java
new file mode 100644
index 000000000..5765eee01
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassBuilder.java
@@ -0,0 +1,133 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.core.infrastructure.java.compile.singleclass;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.tools.Diagnostic;
+import javax.tools.DiagnosticCollector;
+import javax.tools.JavaCompiler;
+import javax.tools.JavaFileObject;
+import javax.tools.ToolProvider;
+
+import org.onap.policy.apex.core.infrastructure.java.JavaHandlingException;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The Class SingleClassBuilder is used to compile the Java code for a Java object and to create an instance of the
+ * object.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class SingleClassBuilder {
+ // Logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(SingleClassBuilder.class);
+
+ // The class name and source code for the class that we are compiling and instantiating
+ private final String className;
+ private final String sourceCode;
+
+ // This specialized JavaFileManager handles class loading for the single Java class
+ private SingleFileManager singleFileManager = null;
+
+ /**
+ * Instantiates a new single class builder.
+ *
+ * @param className the class name
+ * @param sourceCode the source code
+ */
+ public SingleClassBuilder(final String className, final String sourceCode) {
+ // Save the fields of the class
+ this.className = className;
+ this.sourceCode = sourceCode;
+ }
+
+ /**
+ * Compile the single class into byte code.
+ *
+ * @throws JavaHandlingException Thrown on compilation errors or handling errors on the single Java class
+ */
+ public void compile() throws JavaHandlingException {
+ // Get the list of compilation units, there is only one here
+ final List<? extends JavaFileObject> compilationUnits =
+ Arrays.asList(new SingleClassCompilationUnit(className, sourceCode));
+
+ // Allows us to get diagnostics from the compilation
+ final DiagnosticCollector<JavaFileObject> diagnosticListener = new DiagnosticCollector<>();
+
+ // Get the Java compiler
+ final JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+
+ // Set up the target file manager and call the compiler
+ singleFileManager = new SingleFileManager(compiler, new SingleClassByteCodeFileObject(className));
+ final JavaCompiler.CompilationTask task =
+ compiler.getTask(null, singleFileManager, diagnosticListener, null, null, compilationUnits);
+
+ // Check if the compilation worked
+ if (!task.call()) {
+ final StringBuilder builder = new StringBuilder();
+ for (final Diagnostic<? extends JavaFileObject> diagnostic : diagnosticListener.getDiagnostics()) {
+ builder.append("code:");
+ builder.append(diagnostic.getCode());
+ builder.append(", kind:");
+ builder.append(diagnostic.getKind());
+ builder.append(", position:");
+ builder.append(diagnostic.getPosition());
+ builder.append(", start position:");
+ builder.append(diagnostic.getStartPosition());
+ builder.append(", end position:");
+ builder.append(diagnostic.getEndPosition());
+ builder.append(", source:");
+ builder.append(diagnostic.getSource());
+ builder.append(", message:");
+ builder.append(diagnostic.getMessage(null));
+ builder.append("\n");
+ }
+
+ LOGGER.warn("error compiling Java code for class \"" + className + "\": " + builder.toString());
+ throw new JavaHandlingException(
+ "error compiling Java code for class \"" + className + "\": " + builder.toString());
+ }
+ }
+
+ /**
+ * Create a new instance of the Java class using its byte code definition.
+ *
+ * @return A new instance of the object
+ * @throws InstantiationException if an instance of the object cannot be created, for example if the class has no
+ * default constructor
+ * @throws IllegalAccessException the caller does not have permission to call the class
+ * @throws ClassNotFoundException the byte code for the class is not found in the class loader
+ * @throws JavaHandlingException the java handling exception if the Java class source code is not compiled
+ */
+ public Object createObject()
+ throws InstantiationException, IllegalAccessException, ClassNotFoundException, JavaHandlingException {
+ if (singleFileManager == null) {
+ LOGGER.warn("error instantiating instance for class \"" + className + "\": code may not be compiled");
+ throw new JavaHandlingException(
+ "error instantiating instance for class \"" + className + "\": code may not be compiled");
+ }
+
+ return singleFileManager.getClassLoader(null).findClass(className).newInstance();
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassByteCodeFileObject.java b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassByteCodeFileObject.java
new file mode 100644
index 000000000..c0a2ef7a1
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassByteCodeFileObject.java
@@ -0,0 +1,89 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.core.infrastructure.java.compile.singleclass;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+
+import javax.tools.SimpleJavaFileObject;
+
+/**
+ * The Class SingleClassByteCodeFileObject is a specialization of the {@link SimpleJavaFileObject} class, which is
+ * itself an implementation of the {@code JavaFileObject} interface, which provides a file abstraction for tools
+ * operating on Java programming language source and class files. The {@link SimpleJavaFileObject} class provides simple
+ * implementations for most methods in {@code JavaFileObject}. This class is designed to be sub classed and used as a
+ * basis for {@code JavaFileObject} implementations. Subclasses can override the implementation and specification of any
+ * method of this class as long as the general contract of {@code JavaFileObject} is obeyed.
+ *
+ * This class holds the byte code for a single class in memory.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class SingleClassByteCodeFileObject extends SimpleJavaFileObject {
+
+ // The ByteArrayOutputStream holds the byte code for the class
+ private ByteArrayOutputStream byteArrayOutputStream;
+
+ /**
+ * Instantiates the byte code for the class in memory.
+ *
+ * @param className the class name is used to compose a URI for the class
+ */
+ public SingleClassByteCodeFileObject(final String className) {
+ super(URI.create("byte:///" + className + ".class"), Kind.CLASS);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.tools.SimpleJavaFileObject#openOutputStream()
+ */
+ @Override
+ public OutputStream openOutputStream() {
+ // Create the byte array output stream that will hold the byte code for the class, when the class source code is
+ // compiled, this output stream is passed
+ // to the compiler and the byte code for the class is written into the output stream.
+ byteArrayOutputStream = new ByteArrayOutputStream();
+ return byteArrayOutputStream;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.tools.SimpleJavaFileObject#openInputStream()
+ */
+ @Override
+ public InputStream openInputStream() {
+ // No input stream for streaming out the byte code
+ return null;
+ }
+
+ /**
+ * Gets the byte code of the class.
+ *
+ * @return the byte code of the class
+ */
+ public byte[] getByteCode() {
+ return byteArrayOutputStream.toByteArray();
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassCompilationUnit.java b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassCompilationUnit.java
new file mode 100644
index 000000000..9a001d2c2
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassCompilationUnit.java
@@ -0,0 +1,83 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.core.infrastructure.java.compile.singleclass;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+
+import javax.tools.SimpleJavaFileObject;
+
+/**
+ * The Class SingleClassCompilationUnit is a container for the source code of the single Java class in memory. The class
+ * uses a {@link String} to hold the source code.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class SingleClassCompilationUnit extends SimpleJavaFileObject {
+
+ private final String source;
+
+ /**
+ * Instantiates a new compilation unit.
+ *
+ * @param className the class name for the source code
+ * @param source the source code for the class
+ */
+ public SingleClassCompilationUnit(final String className, final String source) {
+ // Create a URI for the source code of the class
+ super(URI.create("file:///" + className + ".java"), Kind.SOURCE);
+ this.source = source;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.tools.SimpleJavaFileObject#getCharContent(boolean)
+ */
+ @Override
+ public CharSequence getCharContent(final boolean ignoreEncodingErrors) {
+ // Return the source code to toe caller, the compiler
+ return source;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.tools.SimpleJavaFileObject#openOutputStream()
+ */
+ @Override
+ public OutputStream openOutputStream() {
+ throw new IllegalStateException();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.tools.SimpleJavaFileObject#openInputStream()
+ */
+ @Override
+ public InputStream openInputStream() {
+ // Return the source code as a stream
+ return new ByteArrayInputStream(source.getBytes());
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassLoader.java b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassLoader.java
new file mode 100644
index 000000000..befed6d40
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassLoader.java
@@ -0,0 +1,61 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.core.infrastructure.java.compile.singleclass;
+
+/**
+ * The Class SingleClassLoader is responsible for class loading the single Java class being held in memory.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class SingleClassLoader extends ClassLoader {
+ // The byte code of the class held in memory as byte code in a ByteCodeFileObject
+ private final SingleClassByteCodeFileObject byteCodeFileObject;
+
+ /**
+ * Instantiates a new single class loader to load the byte code of the class that is being held in memory.
+ *
+ * @param byteCodeFileObject the byte code of the class
+ */
+ public SingleClassLoader(final SingleClassByteCodeFileObject byteCodeFileObject) {
+ this.byteCodeFileObject = byteCodeFileObject;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.ClassLoader#findClass(java.lang.String)
+ */
+ @Override
+ protected Class<?> findClass(final String className) throws ClassNotFoundException {
+ // Creates a java Class that can be instantiated from the class defined in the byte code in the
+ // ByteCodeFileObejct
+ return defineClass(className, byteCodeFileObject.getByteCode(), 0, byteCodeFileObject.getByteCode().length);
+ }
+
+ /**
+ * Gets the file object.
+ *
+ * @return the file object
+ */
+ SingleClassByteCodeFileObject getFileObject() {
+ return byteCodeFileObject;
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleFileManager.java b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleFileManager.java
new file mode 100644
index 000000000..ce92f8e4d
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleFileManager.java
@@ -0,0 +1,79 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.core.infrastructure.java.compile.singleclass;
+
+import java.io.IOException;
+
+import javax.tools.FileObject;
+import javax.tools.ForwardingJavaFileManager;
+import javax.tools.JavaCompiler;
+import javax.tools.JavaFileObject;
+import javax.tools.StandardJavaFileManager;
+
+/**
+ * The Class SingleFileManager is a {@link ForwardingJavaFileManager} which in turn implements {@code JavaFileManager}.
+ * A {@code JavaFileManager} handles source files for Java language handling tools. A {@link ForwardingJavaFileManager}
+ * is an implementation of {@code JavaFileManager} that forwards the {@code JavaFileManager} methods to a given file
+ * manager.
+ *
+ * This class instantiates and forwards those requests to a {@link StandardJavaFileManager} instance to act as a
+ * {@code JavaFileManager} for a Java single file, managing class loading for the class.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class SingleFileManager extends ForwardingJavaFileManager<StandardJavaFileManager> {
+ // THe class loader for our single class
+ private final SingleClassLoader singleClassLoader;
+
+ /**
+ * Instantiates a new single file manager.
+ *
+ * @param compiler the compiler we are using
+ * @param byteCodeFileObject the byte code for the compiled class
+ */
+ public SingleFileManager(final JavaCompiler compiler, final SingleClassByteCodeFileObject byteCodeFileObject) {
+ super(compiler.getStandardFileManager(null, null, null));
+ singleClassLoader = new SingleClassLoader(byteCodeFileObject);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.tools.ForwardingJavaFileManager#getJavaFileForOutput(javax.tools.JavaFileManager.Location,
+ * java.lang.String, javax.tools.JavaFileObject.Kind, javax.tools.FileObject)
+ */
+ @Override
+ public JavaFileObject getJavaFileForOutput(final Location notUsed, final String className,
+ final JavaFileObject.Kind kind, final FileObject sibling) throws IOException {
+ // Return the JavaFileObject to the compiler so that it can write byte code into it
+ return singleClassLoader.getFileObject();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.tools.ForwardingJavaFileManager#getClassLoader(javax.tools.JavaFileManager.Location)
+ */
+ @Override
+ public SingleClassLoader getClassLoader(final Location location) {
+ return singleClassLoader;
+ }
+}
diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/package-info.java
new file mode 100644
index 000000000..5e436818c
--- /dev/null
+++ b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/package-info.java
@@ -0,0 +1,28 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Generates classes from source code by compiling source code and placing the resultant classes on the class path on
+ * the fly.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+
+package org.onap.policy.core.infrastructure.java.compile.singleclass;
diff --git a/core/core-infrastructure/src/main/resources/logback.xml b/core/core-infrastructure/src/main/resources/logback.xml
new file mode 100644
index 000000000..b1d855419
--- /dev/null
+++ b/core/core-infrastructure/src/main/resources/logback.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ============LICENSE_START=======================================================
+ Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ ================================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+ SPDX-License-Identifier: Apache-2.0
+ ============LICENSE_END=========================================================
+-->
+
+<configuration debug="false">
+
+ <contextName>Apex</contextName>
+ <statusListener class="ch.qos.logback.core.status.OnConsoleStatusListener" />
+ <property name="VAR_LOG" value="/var/log/ericsson/apex/" />
+
+ <!-- USE FOR STD OUT ONLY -->
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <Pattern>%d %contextName [%t] %level %logger{36} - %msg%n</Pattern>
+ </encoder>
+ </appender>
+
+ <root level="info">
+ <appender-ref ref="STDOUT" />
+ </root>
+
+ <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${VAR_LOG}/apex.log</file>
+ <encoder>
+ <pattern>%d %-5relative [procId=${processId}] [%thread] %-5level
+ %logger{26} - %msg %n %ex{full}</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.onap.policy.apex" level="info" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+</configuration>
diff --git a/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/EndToEndStringMessagingTest.java b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/EndToEndStringMessagingTest.java
new file mode 100644
index 000000000..e18c327c0
--- /dev/null
+++ b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/EndToEndStringMessagingTest.java
@@ -0,0 +1,87 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.junit.Test;
+import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageClient;
+import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageListener;
+import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageServer;
+import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The Class EndToEndMessagingTest.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class EndToEndStringMessagingTest {
+ // Logger for this class
+ private static final XLogger logger = XLoggerFactory.getXLogger(EndToEndStringMessagingTest.class);
+
+ private WSStringMessageServer server;
+ private WSStringMessageClient client;
+
+ private boolean finished = false;
+
+ @Test
+ public void testEndToEndMessaging() throws MessagingException {
+ logger.debug("end to end messaging test starting . . .");
+ server = new WSStringMessageServer(44441);
+ assertNotNull(server);
+ server.start(new WSStringServerMessageListener());
+
+ client = new WSStringMessageClient("localhost", 44441);
+ assertNotNull(client);
+ client.start(new WSStringClientMessageListener());
+
+ client.sendString("Hello, client here");
+
+ while (!finished) {
+ ThreadUtilities.sleep(50);
+ }
+ client.stop();
+
+ server.stop();
+ logger.debug("end to end messaging test finished");
+ }
+
+ private class WSStringServerMessageListener implements WSStringMessageListener {
+ @Override
+ public void receiveString(final String stringMessage) {
+ logger.debug(stringMessage);
+ assertEquals("Hello, client here", stringMessage);
+ server.sendString("Hello back from server");
+ }
+ }
+
+ private class WSStringClientMessageListener implements WSStringMessageListener {
+ @Override
+ public void receiveString(final String stringMessage) {
+ logger.debug(stringMessage);
+ assertEquals("Hello back from server", stringMessage);
+ finished = true;
+ }
+ }
+}
diff --git a/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/StringTestServer.java b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/StringTestServer.java
new file mode 100644
index 000000000..09fa62d59
--- /dev/null
+++ b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/StringTestServer.java
@@ -0,0 +1,83 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging;
+
+import static org.junit.Assert.assertNotNull;
+
+import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageListener;
+import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageServer;
+import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
+
+public class StringTestServer {
+ private WSStringMessageServer server;
+
+ public StringTestServer(final int port, long timeToLive) throws MessagingException {
+ System.out.println("StringTestServer starting on port " + port + " for " + timeToLive + " seconds . . .");
+ server = new WSStringMessageServer(port);
+ assertNotNull(server);
+ server.start(new WSStringServerMessageListener());
+
+ System.out.println("StringTestServer started on port " + port + " for " + timeToLive + " seconds");
+
+ for (; timeToLive > 0; timeToLive--) {
+ ThreadUtilities.sleep(1000);
+ }
+
+ server.stop();
+ System.out.println("StringTestServer completed");
+ }
+
+ private class WSStringServerMessageListener implements WSStringMessageListener {
+ @Override
+ public void receiveString(final String stringMessage) {
+ System.out.println("Server received string \"" + stringMessage + "\"");
+ server.sendString("Server echoing back the message: \"" + stringMessage + "\"");
+ }
+ }
+
+ public static void main(final String[] args) throws MessagingException {
+ if (args.length != 2) {
+ System.err.println("Usage: StringTestServer port timeToLive");
+ return;
+ }
+
+ int port = 0;
+ try {
+ port = Integer.parseInt(args[0]);
+ } catch (final Exception e) {
+ System.err.println("Usage: StringTestServer port timeToLive");
+ e.printStackTrace();
+ return;
+ }
+
+ long timeToLive = 0;
+ try {
+ timeToLive = Long.parseLong(args[1]);
+ } catch (final Exception e) {
+ System.err.println("Usage: StringTestServer port timeToLive");
+ e.printStackTrace();
+ return;
+ }
+
+ new StringTestServer(port, timeToLive);
+
+ }
+}
diff --git a/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/TestMessageListener.java b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/TestMessageListener.java
new file mode 100644
index 000000000..9e7562b5e
--- /dev/null
+++ b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/TestMessageListener.java
@@ -0,0 +1,65 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.messaging;
+
+import com.google.common.eventbus.Subscribe;
+
+import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The listener interface for receiving testMessage events. The class that is interested in processing a testMessage
+ * event implements this interface, and the object created with that class is registered with a component using the
+ * component's <code>addTestMessageListener</code> method. When the testMessage event occurs, that object's appropriate
+ * method is invoked.
+ *
+ */
+public abstract class TestMessageListener implements MessageListener<String> {
+
+ /** The Constant logger. */
+ private static final XLogger logger = XLoggerFactory.getXLogger(TestMessageListener.class);
+
+ /**
+ * On command.
+ *
+ * @param data the data
+ */
+ public abstract void onCommand(MessageBlock<String> data);
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap.policy.apex.core.
+ * infrastructure. messaging.impl.ws.data.Data)
+ */
+ @Subscribe
+ @Override
+ public final void onMessage(final MessageBlock<String> data) {
+ if (data != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} command recieved from machine {} ", data.getMessages().size(),
+ data.getConnection().getRemoteSocketAddress().getHostString());
+ }
+ onCommand(data);
+ }
+ }
+}
diff --git a/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/threading/ThreadingTest.java b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/threading/ThreadingTest.java
new file mode 100644
index 000000000..e570ae0c7
--- /dev/null
+++ b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/threading/ThreadingTest.java
@@ -0,0 +1,92 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.threading;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The Class ThreadingTest.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class ThreadingTest {
+
+ // Logger for this class
+ private static final XLogger logger = XLoggerFactory.getXLogger(ThreadingTest.class);
+
+ /**
+ * Test thread factory initialization.
+ */
+ @Test
+ public void testThreadFactoryInitialization() {
+ final ApplicationThreadFactory threadFactory0 = new ApplicationThreadFactory("localName", 0);
+ assertNotNull("Failed to create ApplicationThreadFactory threadFactory0", threadFactory0);
+ logger.debug(threadFactory0.toString());
+ assertTrue("Failed to name ApplicationThreadFactory threadFactory0",
+ threadFactory0.getName().startsWith("Apex-localName"));
+ final ApplicationThreadFactory threadFactory1 = new ApplicationThreadFactory("localName", 0);
+ assertNotNull("Failed to create ApplicationThreadFactory threadFactory1", threadFactory1);
+ logger.debug(threadFactory1.toString());
+ assertTrue("Failed to name ApplicationThreadFactory threadFactory1",
+ threadFactory1.getName().startsWith("Apex-localName"));
+
+ testThreadFactory(threadFactory0, 0);
+ testThreadFactory(threadFactory1, 1);
+ }
+
+ /**
+ * Test thread factory.
+ *
+ * @param threadFactory the thread factory
+ * @param factoryId the factory id
+ */
+ private void testThreadFactory(final ApplicationThreadFactory threadFactory, final int factoryId) {
+ final List<ThreadingTestThread> threadList = new ArrayList<ThreadingTestThread>();
+
+ for (int i = 0; i < 5; i++) {
+ threadList.add(new ThreadingTestThread());
+ threadList.get(i).setThread(threadFactory.newThread(threadList.get(i)));
+ assertTrue(threadList.get(i).getName().startsWith("Apex-localName"));
+ assertTrue(threadList.get(i).getName().contains(":" + i));
+ threadList.get(i).getThread().start();
+ }
+
+ // Threads should need a little more than 300ms to count to 3
+ ThreadUtilities.sleep(380);
+
+ for (int i = 0; i < 5; i++) {
+ threadList.get(i).interrupt();
+ }
+
+ for (int i = 0; i < 5; i++) {
+ assertTrue("Thread (" + i + ") failed to get count (" + threadList.get(i).getCounter() + ") up to 3",
+ threadList.get(i).getCounter() == 3);
+ }
+ }
+}
diff --git a/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/threading/ThreadingTestThread.java b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/threading/ThreadingTestThread.java
new file mode 100644
index 000000000..195072886
--- /dev/null
+++ b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/threading/ThreadingTestThread.java
@@ -0,0 +1,110 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.core.infrastructure.threading;
+
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The Class ThreadingTestThread.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class ThreadingTestThread implements Runnable {
+
+ // Logger for this class
+ private static final XLogger logger = XLoggerFactory.getXLogger(ThreadingTestThread.class);
+
+ private boolean interrupted = false;
+
+ private long counter = -1;
+
+ private Thread thread = null;
+
+ /**
+ * Sets the thread.
+ *
+ * @param thread the new thread
+ */
+ public void setThread(final Thread thread) {
+ this.thread = thread;
+ }
+
+ /**
+ * Gets the thread.
+ *
+ * @return the thread
+ */
+ public Thread getThread() {
+ return thread;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("starting threading test thread \"" + thread.getName() + "\" . . .");
+ }
+
+ while (!interrupted) {
+ counter++;
+ if (logger.isDebugEnabled()) {
+ logger.debug("in threading test thread \"" + thread.getName() + "\", counter=" + counter + " . . .");
+ }
+
+ if (!ThreadUtilities.sleep(100)) {
+ interrupted = true;
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("stopped threading test thread \"" + thread.getName() + "\"");
+ }
+ }
+
+ /**
+ * Gets the name.
+ *
+ * @return the name
+ */
+ public String getName() {
+ return thread.getName();
+ }
+
+ /**
+ * Interrupt.
+ */
+ public void interrupt() {
+ interrupted = true;
+ }
+
+ /**
+ * Gets the counter.
+ *
+ * @return the counter
+ */
+ public Long getCounter() {
+ return counter;
+ }
+}
diff --git a/core/core-infrastructure/src/test/resources/logback-test.xml b/core/core-infrastructure/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..034511335
--- /dev/null
+++ b/core/core-infrastructure/src/test/resources/logback-test.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ============LICENSE_START=======================================================
+ Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ ================================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+ SPDX-License-Identifier: Apache-2.0
+ ============LICENSE_END=========================================================
+-->
+
+<configuration>
+
+ <contextName>Apex</contextName>
+ <statusListener class="ch.qos.logback.core.status.OnConsoleStatusListener" />
+ <property name="LOG_DIR" value="${java.io.tmpdir}/apex_logging/" />
+
+ <!-- USE FOR STD OUT ONLY -->
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <Pattern>%d %contextName [%t] %level %logger{36} - %msg%n</Pattern>
+ </encoder>
+ </appender>
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT" />
+ </root>
+
+ <logger name="org.infinispan" level="INFO" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+
+ <logger name="org.apache.zookeeper.ClientCnxn" level="OFF" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+
+ <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+ <file>${LOG_DIR}/apex.log</file>
+ <encoder>
+ <pattern>%d %-5relative [procId=${processId}] [%thread] %-5level
+ %logger{26} - %msg %n %ex{full}</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="CTXT_FILE" class="ch.qos.logback.core.FileAppender">
+ <file>${LOG_DIR}/apex_ctxt.log</file>
+ <encoder>
+ <pattern>%d %-5relative [procId=${processId}] [%thread] %-5level
+ %logger{26} - %msg %n %ex{full}</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.onap.policy.apex.core.context.impl.monitoring" level="TRACE" additivity="false">
+ <appender-ref ref="CTXT_FILE" />
+ </logger>
+
+ <logger name="org.onap.policy.apex.core.context" level="INFO" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+</configuration>