From 37d6fd9069eb30d88c4ad80b5f35099ed173cc13 Mon Sep 17 00:00:00 2001 From: ramverma Date: Fri, 1 Jun 2018 11:51:36 +0100 Subject: Adding apex core module to apex-pdp Change-Id: I4bfe1df3e44fe62ff6789e813e59836e267ab3b2 Issue-ID: POLICY-858 Signed-off-by: ramverma --- .../infrastructure/java/JavaHandlingException.java | 58 +++++ .../infrastructure/java/classes/ClassUtils.java | 249 ++++++++++++++++++++ .../infrastructure/java/classes/package-info.java | 27 +++ .../core/infrastructure/java/package-info.java | 27 +++ .../infrastructure/messaging/MessageHolder.java | 168 +++++++++++++ .../infrastructure/messaging/MessageListener.java | 47 ++++ .../messaging/MessagingException.java | 49 ++++ .../infrastructure/messaging/MessagingService.java | 76 ++++++ .../messaging/MessagingServiceFactory.java | 59 +++++ .../messaging/impl/ws/RawMessageHandler.java | 253 ++++++++++++++++++++ .../impl/ws/WebSocketMessageListener.java | 58 +++++ .../impl/ws/client/InternalMessageBusClient.java | 131 +++++++++++ .../messaging/impl/ws/client/MessagingClient.java | 162 +++++++++++++ .../impl/ws/client/WebSocketClientImpl.java | 83 +++++++ .../messaging/impl/ws/client/package-info.java | 27 +++ .../impl/ws/messageblock/MessageBlock.java | 70 ++++++ .../impl/ws/messageblock/MessageBlockHandler.java | 128 ++++++++++ .../impl/ws/messageblock/RawMessageBlock.java | 67 ++++++ .../impl/ws/messageblock/package-info.java | 27 +++ .../messaging/impl/ws/package-info.java | 27 +++ .../impl/ws/server/InternalMessageBusServer.java | 134 +++++++++++ .../impl/ws/server/MessageServerImpl.java | 161 +++++++++++++ .../impl/ws/server/WebSocketServerImpl.java | 89 +++++++ .../messaging/impl/ws/server/package-info.java | 27 +++ .../infrastructure/messaging/package-info.java | 27 +++ .../stringmessaging/WSStringMessageClient.java | 147 ++++++++++++ .../stringmessaging/WSStringMessageListener.java | 36 +++ .../stringmessaging/WSStringMessageServer.java | 150 ++++++++++++ .../stringmessaging/WSStringMessager.java | 51 ++++ .../messaging/stringmessaging/package-info.java | 27 +++ .../messaging/util/MessagingUtils.java | 260 +++++++++++++++++++++ .../messaging/util/package-info.java | 27 +++ .../apex/core/infrastructure/package-info.java | 27 +++ .../threading/ApplicationThreadFactory.java | 142 +++++++++++ .../infrastructure/threading/ThreadUtilities.java | 50 ++++ .../infrastructure/threading/package-info.java | 27 +++ .../apex/core/infrastructure/xml/XPathReader.java | 115 +++++++++ .../apex/core/infrastructure/xml/package-info.java | 27 +++ .../compile/singleclass/SingleClassBuilder.java | 133 +++++++++++ .../singleclass/SingleClassByteCodeFileObject.java | 89 +++++++ .../singleclass/SingleClassCompilationUnit.java | 83 +++++++ .../compile/singleclass/SingleClassLoader.java | 61 +++++ .../compile/singleclass/SingleFileManager.java | 79 +++++++ .../java/compile/singleclass/package-info.java | 28 +++ .../src/main/resources/logback.xml | 50 ++++ .../messaging/EndToEndStringMessagingTest.java | 87 +++++++ .../infrastructure/messaging/StringTestServer.java | 83 +++++++ .../messaging/TestMessageListener.java | 65 ++++++ .../infrastructure/threading/ThreadingTest.java | 92 ++++++++ .../threading/ThreadingTestThread.java | 110 +++++++++ .../src/test/resources/logback-test.xml | 70 ++++++ 51 files changed, 4347 insertions(+) create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/JavaHandlingException.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/ClassUtils.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/package-info.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/package-info.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageHolder.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageListener.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingException.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingService.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingServiceFactory.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/WebSocketMessageListener.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/InternalMessageBusClient.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/WebSocketClientImpl.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/package-info.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlock.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlockHandler.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/RawMessageBlock.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/package-info.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/package-info.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/InternalMessageBusServer.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/WebSocketServerImpl.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/package-info.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/package-info.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageClient.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageListener.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageServer.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessager.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/package-info.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/package-info.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/package-info.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ApplicationThreadFactory.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/package-info.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/xml/XPathReader.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/xml/package-info.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassBuilder.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassByteCodeFileObject.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassCompilationUnit.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassLoader.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleFileManager.java create mode 100644 core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/package-info.java create mode 100644 core/core-infrastructure/src/main/resources/logback.xml create mode 100644 core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/EndToEndStringMessagingTest.java create mode 100644 core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/StringTestServer.java create mode 100644 core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/TestMessageListener.java create mode 100644 core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/threading/ThreadingTest.java create mode 100644 core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/threading/ThreadingTestThread.java create mode 100644 core/core-infrastructure/src/test/resources/logback-test.xml (limited to 'core/core-infrastructure/src') diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/JavaHandlingException.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/JavaHandlingException.java new file mode 100644 index 000000000..f6ef68105 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/JavaHandlingException.java @@ -0,0 +1,58 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.java; + +/** + * This class will be called if an error occurs in Java handling. + * + * @author Liam Fallon + */ +public class JavaHandlingException extends Exception { + private static final long serialVersionUID = -6375859029774312663L; + + /** + * Instantiates a new Java handling exception. + * + * @param message the message + */ + public JavaHandlingException(final String message) { + super(message); + } + + /** + * Instantiates a new Java handling exception. + * + * @param e the exception to wrap + */ + public JavaHandlingException(final Exception e) { + super(e); + } + + /** + * Instantiates a new Java handling exception. + * + * @param message the message + * @param e the exception to wrap + */ + public JavaHandlingException(final String message, final Exception e) { + super(message, e); + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/ClassUtils.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/ClassUtils.java new file mode 100644 index 000000000..919d1b122 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/ClassUtils.java @@ -0,0 +1,249 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.java.classes; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.Set; +import java.util.TreeSet; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This class is a utility class used to find Java classes on the class path, in directories, and in Jar files. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public abstract class ClassUtils { + // Get a reference to the logger + private static final XLogger LOGGER = XLoggerFactory.getXLogger(ClassUtils.class); + + // The boot directory in Java for predefined JARs + private static final String SUN_BOOT_LIBRARY_PATH = "sun.boot.library.path"; + + // Token for Classes directory in paths + private static final String CLASSES_TOKEN = "/classes/"; + + // Token for library fragment in path + private static final String LIBRARAY_PATH_TOKEN = "/lib"; + + /** + * Private constructor used to prevent sub class instantiation. + */ + private ClassUtils() {} + + /** + * Get the class names of all classes on the class path. WARNING: This is a heavy call, use sparingly + * + * @return a set of class names for all classes in the class path + */ + public static Set getClassNames() { + // The return set of class names + final Set classNameSet = new TreeSet<>(); + + try { + // The library path for predefined classes in Java + String sunBootLibraryPathString = System.getProperty(SUN_BOOT_LIBRARY_PATH); + + // Check it exists and has a "lib" in it + if (sunBootLibraryPathString != null && sunBootLibraryPathString.contains(LIBRARAY_PATH_TOKEN)) { + // Strip any superfluous trailer from path + sunBootLibraryPathString = sunBootLibraryPathString.substring(0, + sunBootLibraryPathString.lastIndexOf(LIBRARAY_PATH_TOKEN) + LIBRARAY_PATH_TOKEN.length()); + + final File bootLibraryFile = new File(sunBootLibraryPathString); + // The set used to hold class names is populated with predefined Java classes + classNameSet.addAll(processDir(bootLibraryFile, "")); + } + + // Get the entries on the class path + URL[] urls = ((URLClassLoader) ClassLoader.getSystemClassLoader()).getURLs(); + + // Try get the classes in the bootstrap loader + try { + final Class nullclassloader = Class.forName("sun.misc.Launcher"); + if (nullclassloader != null) { + // There a long way and a short way, Short way: causes a warning that cannot be suppressed + // URL[] moreurls = sun.misc.Launcher.getBootstrapClassPath().getURLs(); + // long way: + Method m = nullclassloader.getMethod("getBootstrapClassPath"); + if (m != null) { + final Object cp = m.invoke(null, (Object[]) null); + if (cp != null) { + m = cp.getClass().getMethod("getURLs"); + if (m != null) { + final URL[] moreurls = (URL[]) (m.invoke(cp, (Object[]) null)); + if (moreurls != null && moreurls.length > 0) { + if (urls.length == 0) { + urls = moreurls; + } else { + final URL[] result = Arrays.copyOf(urls, urls.length + moreurls.length); + System.arraycopy(moreurls, 0, result, urls.length, moreurls.length); + urls = result; + } + } + } + } + } + // end long way! + } + } catch (final ClassNotFoundException e) { + LOGGER.warn("Failed to find default path for JRE libraries", e); + } + + // Iterate over the class path entries + for (final URL url : urls) { + if (url == null || url.getFile() == null) { + continue; + } + final File urlFile = new File(url.getFile()); + // Directories may contain ".class" files + if (urlFile.isDirectory()) { + classNameSet.addAll(processDir(urlFile, url.getFile())); + } + // JARs are processed as well + else if (url.getFile().endsWith(".jar")) { + classNameSet.addAll(processJar(urlFile)); + } else { + // It's a resource or some other non-executable thing + continue; + } + } + } catch (final Exception e) { + LOGGER.warn("could not get the names of Java classes", e); + } + + return classNameSet; + } + + /** + * Find all classes in directories and JARs in those directories. + * + * @param classDirectory The directory to search for classes + * @param rootDir The root directory, to be removed from absolute paths + * @return a set of classes which may be empty + * @throws Exception on errors processing directories + */ + public static Set processDir(final File classDirectory, final String rootDir) throws Exception { + // The return set + final TreeSet classNameSet = new TreeSet<>(); + + // Iterate over the directory + if (classDirectory == null || !classDirectory.isDirectory()) { + return classNameSet; + } + for (final File child : classDirectory.listFiles()) { + if (child.isDirectory()) { + // Recurse down + classNameSet.addAll(processDir(child, rootDir)); + } else if (child.getName().endsWith(".jar")) { + // Process the JAR + classNameSet.addAll(processJar(child)); + } else if (child.getName().endsWith(".class") && !child.getName().contains("$")) { + // Process the ".class" file + classNameSet.add( + child.getAbsolutePath().replace(rootDir, "").replaceFirst("\\.class$", "").replace('/', '.')); + } else { + continue; + } + } + return classNameSet; + } + + /** + * Condition the file name as a class name. + * + * @param fileNameIn The file name to convert to a class name + * @return the conditioned class name + */ + public static String processFileName(final String fileNameIn) { + String fileName = fileNameIn; + + if (fileName == null) { + return null; + } + final int classesPos = fileName.indexOf(CLASSES_TOKEN); + + if (classesPos != -1) { + fileName = fileName.substring(classesPos + CLASSES_TOKEN.length()); + } + + return fileName.replaceFirst("\\.class$", "").replace('/', '.'); + } + + /** + * Read all the class names from a Jar. + * + * @param jarFile the JAR file + * @return a set of class names + * @throws Exception on errors processing JARs + */ + public static Set processJar(final File jarFile) throws Exception { + // Pass the file as an input stream + return processJar(new FileInputStream(jarFile.getAbsolutePath())); + } + + /** + * Read all the class names from a Jar. + * + * @param jarInputStream the JAR input stream + * @return a set of class names + * @throws Exception on errors processing JARs + */ + public static Set processJar(final InputStream jarInputStream) throws Exception { + // The return set + final TreeSet classPathSet = new TreeSet<>(); + + if (jarInputStream == null) { + return classPathSet; + } + // JARs are ZIP files + final ZipInputStream zip = new ZipInputStream(jarInputStream); + + // Iterate over each entry in the JAR + for (ZipEntry entry = zip.getNextEntry(); entry != null; entry = zip.getNextEntry()) { + if (!entry.isDirectory() && entry.getName().endsWith(".class") && !entry.getName().contains("$")) { + classPathSet.add(entry.getName().replaceFirst("\\.class$", "").replace('/', '.')); + } + } + zip.close(); + return classPathSet; + } + + /** + * The main method exercises this class for test purposes. + * + * @param args the args + */ + public static void main(final String[] args) { + for (final String clz : getClassNames()) { + System.out.println("Found class: " + clz); + } + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/package-info.java new file mode 100644 index 000000000..c356580b3 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/classes/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Contains support to find Java classes on the class path, in directories and in Jar files. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ + +package org.onap.policy.apex.core.infrastructure.java.classes; diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/package-info.java new file mode 100644 index 000000000..5a8b51132 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/java/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Allows Java classes to be created by compiling Java source code and generating classes on the fly. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ + +package org.onap.policy.apex.core.infrastructure.java; diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageHolder.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageHolder.java new file mode 100644 index 000000000..f74ffa0b3 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageHolder.java @@ -0,0 +1,168 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging; + +import java.io.Serializable; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class MessageHolder holds a set of messages to be sent as a single block of messages in this messaging + * implementation. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + * @param the generic type of message being handled by a message holder instance + */ +public class MessageHolder implements Serializable { + private static final int HASH_PRIME = 31; + private static final int FOUR_BYTES = 32; + + // Serial ID + private static final long serialVersionUID = 1235487535388793719L; + + // Get a reference to the logger + private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessageHolder.class); + + // Properties of the message holder + private final long creationTime; + private final InetAddress senderHostAddress; + + // Sequence of message in the message holder + private final List messages; + + /** + * Constructor, create the message holder. + * + * @param senderHostAddress the host address of the sender of the message holder container + */ + public MessageHolder(final InetAddress senderHostAddress) { + LOGGER.entry(senderHostAddress); + messages = new ArrayList<>(); + this.senderHostAddress = senderHostAddress; + creationTime = System.currentTimeMillis(); + } + + /** + * Return the messages in this message holder. + * + * @return the messages + */ + public List getMessages() { + return messages; + } + + /** + * Adds a message to this message holder. + * + * @param message the message to add + */ + public void addMessage(final MESSAGE message) { + if (!messages.contains(message)) { + messages.add(message); + } else { + LOGGER.warn("duplicate message {} added to message holder", message); + } + } + + /** + * Gets the creation time. + * + * @return the creation time + */ + public long getCreationTime() { + return creationTime; + } + + /** + * Gets the sender host address. + * + * @return the sender host address + */ + public InetAddress getSenderHostAddress() { + return senderHostAddress; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return "ApexCommandProtocol [creationTime=" + creationTime + ", senderHostAddress=" + senderHostAddress + "]"; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + final int prime = HASH_PRIME; + int result = 1; + result = prime * result + ((senderHostAddress == null) ? 0 : senderHostAddress.hashCode()); + result = prime * result + ((messages == null) ? 0 : messages.hashCode()); + result = prime * result + (int) (creationTime ^ (creationTime >>> FOUR_BYTES)); + return result; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final MessageHolder other = (MessageHolder) obj; + if (senderHostAddress == null) { + if (other.senderHostAddress != null) { + return false; + } + } else if (!senderHostAddress.equals(other.senderHostAddress)) { + return false; + } + if (messages == null) { + if (other.messages != null) { + return false; + } + } else if (!messages.equals(other.messages)) { + return false; + } + if (creationTime != other.creationTime) { + return false; + } + return true; + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageListener.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageListener.java new file mode 100644 index 000000000..c8b132423 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessageListener.java @@ -0,0 +1,47 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging; + +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock; + +/** + * The listener interface for receiving message events. The class that is interested in processing a message event + * implements this interface. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + * @param of message of any given type that is being listened for and handled + */ +public interface MessageListener { + + /** + * This method is called when a message block is received on a web socket and is to be forwarded to a listener. + * + * @param data the message data containing a message + */ + void onMessage(MessageBlock data); + + /** + * This method is called when a string message is received on a web socket and is to be forwarded to a listener. + * + * @param messageString the message string + */ + void onMessage(String messageString); +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingException.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingException.java new file mode 100644 index 000000000..ef435b2a5 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingException.java @@ -0,0 +1,49 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging; + +/** + * This class will be called if an error occurs in Java handling. + * + * @author Liam Fallon + */ +public class MessagingException extends Exception { + private static final long serialVersionUID = -6375859029774312663L; + + /** + * Instantiates a new messaging exception. + * + * @param message the message + */ + public MessagingException(final String message) { + super(message); + } + + /** + * Instantiates a new messaging exception. + * + * @param message the message + * @param e the e + */ + public MessagingException(final String message, final Exception e) { + super(message, e); + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingService.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingService.java new file mode 100644 index 000000000..7e91b95ea --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingService.java @@ -0,0 +1,76 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging; + +/** + * The Interface MessagingService specifies the methods that must be implemented by any implementation providing Apex + * messaging. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + * @param the type of message being passed by an implementation of Apex messaging + */ +public interface MessagingService { + + /** + * Start the messaging connection. + */ + void startConnection(); + + /** + * Stop the messaging connection. + */ + void stopConnection(); + + /** + * Checks if the messaging connection is started. + * + * @return true, if is started + */ + boolean isStarted(); + + /** + * Send a block of messages on the connection, the messages are contained in the the message holder container. + * + * @param messageHolder The message holder holding the messages to be sent + */ + void send(MessageHolder messageHolder); + + /** + * Send a string message on the connection. + * + * @param messageString The message string to be sent + */ + void send(String messageString); + + /** + * Adds a message listener that will be called when a message is received by this messaging service implementation. + * + * @param messageListener the message listener + */ + void addMessageListener(MessageListener messageListener); + + /** + * Removes the message listener. + * + * @param messageListener the message listener + */ + void removeMessageListener(MessageListener messageListener); +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingServiceFactory.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingServiceFactory.java new file mode 100644 index 000000000..1d08fac74 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/MessagingServiceFactory.java @@ -0,0 +1,59 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging; + +import java.net.InetSocketAddress; +import java.net.URI; + +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.client.MessagingClient; +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.server.MessageServerImpl; + +/** + * A factory class to create a "server" or "client" type Messaging Service. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + * @param the generic type of message to be handled by this messaging service + */ +public class MessagingServiceFactory { + + /** + * Create a web socket server instance and returns to the caller. + * + * @param address the address of the server machine + * @return the messaging service + */ + public MessagingService createServer(final InetSocketAddress address) { + return new MessageServerImpl<>(address); + } + + /** + * Create a web socket client instance and returns to the caller. + * + * @param uri the URI of the server to connect to + * @return an instance of {@link MessagingService} + */ + public MessagingService createClient(final URI uri) { + if (uri == null) { + throw new IllegalArgumentException("URI cannot be null"); + } + return new MessagingClient<>(uri); + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java new file mode 100644 index 000000000..534bee8af --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/RawMessageHandler.java @@ -0,0 +1,253 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging.impl.ws; + +import com.google.common.eventbus.Subscribe; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder; +import org.onap.policy.apex.core.infrastructure.messaging.MessageListener; +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock; +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlockHandler; +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.RawMessageBlock; +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class RawMessageHandler handles raw messages being received on a Java web socket and forwards the messages to the + * DataHandler instance that has subscribed to the RawMessageHandler instance. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + * @param the generic type of message being received + */ +public class RawMessageHandler implements WebSocketMessageListener, Runnable { + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(RawMessageHandler.class); + + // The amount of time to sleep during shutdown for the thread of this message handler to stop + private static final int SHUTDOWN_WAIT_TIME = 10; + + // The timeout to wait between queue poll timeouts in milliseconds + private static final long QUEUE_POLL_TIMEOUT = 50; + + // A queue that temporarily holds message blocks + private final BlockingQueue> messageBlockQueue = new LinkedBlockingDeque<>(); + + // A queue that temporarily holds message blocks + private final BlockingQueue stringMessageQueue = new LinkedBlockingDeque<>(); + + // Client applications that have subscribed for messages + private final MessageBlockHandler dataHandler = new MessageBlockHandler("data-processor"); + + // The thread that the raw message handler is receiving messages on + private Thread thisThread = null; + + /** + * This method is called by the class with which this message listener has been registered. + * + * @param incomingData the data forwarded by the message reception class + */ + @Override + @Subscribe + public void onMessage(final RawMessageBlock incomingData) { + // Sanity check and get incoming data + ByteBuffer dataByteBuffer = null; + if (incomingData != null && incomingData.getMessage() != null) { + dataByteBuffer = incomingData.getMessage(); + } else { + return; + } + + // Read the messages from the web socket and place them on the message queue for handling by the queue + // processing thread + ObjectInputStream ois = null; + try { + ois = new ObjectInputStream(new ByteArrayInputStream(dataByteBuffer.array())); + @SuppressWarnings("unchecked") + final MessageHolder messageHolder = (MessageHolder) ois.readObject(); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("message {} recieved from the client {} ", messageHolder.toString(), + messageHolder == null ? "Apex Engine " : messageHolder.getSenderHostAddress()); + } + + final List messages = messageHolder.getMessages(); + if (messages != null) { + messageBlockQueue.add(new MessageBlock(messages, incomingData.getConn())); + } + } catch (IOException | ClassNotFoundException e) { + LOGGER.error("Failed to process message received"); + LOGGER.catching(e); + } finally { + closeObjectStream(ois); + } + } + + /** + * This method is called when a string message is received on a web socket and is to be forwarded to a listener. + * + * @param messageString the message string + */ + @Override + @Subscribe + public void onMessage(final String messageString) { + if (messageString == null) { + return; + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("message {} recieved from the client {} ", messageString); + } + stringMessageQueue.add(messageString); + } + + /** + * Close the {@link ObjectInputStream} stream. + * + * @param ois is an instance of {@link ObjectInputStream} + */ + private void closeObjectStream(final ObjectInputStream ois) { + if (ois != null) { + try { + ois.close(); + } catch (final IOException e) { + LOGGER.catching(e); + } + } + } + + /** + * This thread monitors the message queue and processes messages as they appear on the queue. + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + LOGGER.debug("raw message listening started"); + thisThread = Thread.currentThread(); + + // Run until termination + while (thisThread.isAlive() && !thisThread.isInterrupted()) { + try { + // Read message block messages from the queue and pass it to the data handler + MessageBlock messageBlock = null; + while ((messageBlock = messageBlockQueue.poll(1, TimeUnit.MILLISECONDS)) != null) { + dataHandler.post(messageBlock); + } + } catch (final InterruptedException e) { + LOGGER.debug("raw message listening has been interrupted"); + break; + } + + try { + // Read string messages from the queue and pass it to the data handler + String stringMessage = null; + while ((stringMessage = stringMessageQueue.poll(1, TimeUnit.MILLISECONDS)) != null) { + dataHandler.post(stringMessage); + } + } catch (final InterruptedException e) { + LOGGER.debug("raw message listening has been interrupted"); + break; + } + + // Wait for new messages + try { + Thread.sleep(QUEUE_POLL_TIMEOUT); + } catch (final InterruptedException e) { + LOGGER.debug("raw message listening has been interrupted"); + break; + } + } + + LOGGER.debug("raw message listening stopped"); + } + + /** + * Shutdown the message handler. + */ + public void shutdown() { + LOGGER.entry("shutting down raw message listening . . ."); + + // Interrupt the message handling thread + thisThread.interrupt(); + + // Wait for thread shutdown + while (thisThread.isAlive()) { + ThreadUtilities.sleep(SHUTDOWN_WAIT_TIME); + } + + LOGGER.exit("shut down raw message listening"); + } + + /** + * This method is called when a message is received on a web socket and is to be forwarded to a listener. + * + * @param data the message data containing a message + */ + @Override + public void onMessage(final MessageBlock data) { + throw new UnsupportedOperationException("this operation is not supported"); + } + + /** + * Register a data forwarder to which messages coming in on the web socket will be forwarded. + * + * @param listener The listener to register + */ + @Override + public void registerDataForwarder(final MessageListener listener) { + stateCheck(listener); + dataHandler.registerMessageHandler(listener); + } + + /** + * Unregister a data forwarder that was previously registered on the web socket listener. + * + * @param listener The listener to unregister + */ + @Override + public void unRegisterDataForwarder(final MessageListener listener) { + stateCheck(listener); + dataHandler.unRegisterMessageHandler(listener); + } + + /** + * Sanity check for the listener and data handler. + * + * @param listener the listener to check + */ + private void stateCheck(final MessageListener listener) { + if (listener == null) { + throw new IllegalArgumentException("The listener object cannot be null"); + } + if (dataHandler == null) { + throw new IllegalStateException("Data handler not initialized"); + } + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/WebSocketMessageListener.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/WebSocketMessageListener.java new file mode 100644 index 000000000..aa951b4ec --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/WebSocketMessageListener.java @@ -0,0 +1,58 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging.impl.ws; + +import org.onap.policy.apex.core.infrastructure.messaging.MessageListener; +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.RawMessageBlock; + +/** + * The listener interface for receiving webSocketMessage events. The class that is interested in processing a + * webSocketMessage event implements this interface, and the object created with that class is registered with a + * component using the component's addWebSocketMessageListener method. When the webSocketMessage event occurs, that + * object's appropriate method is invoked. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + * @param the generic type + * @see RawMessageBlock + */ +public interface WebSocketMessageListener extends MessageListener, Runnable { + + /** + * This method is called by the class with which this message listener has been registered. + * + * @param incomingData the data forwarded by the message reception class + */ + void onMessage(RawMessageBlock incomingData); + + /** + * Register a data forwarder to which messages coming in on the web socket will be forwarded. + * + * @param listener The listener to register + */ + void registerDataForwarder(MessageListener listener); + + /** + * Unregister a data forwarder that was previously registered on the web socket listener. + * + * @param listener The listener to unregister + */ + void unRegisterDataForwarder(MessageListener listener); +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/InternalMessageBusClient.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/InternalMessageBusClient.java new file mode 100644 index 000000000..9f7f89d8c --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/InternalMessageBusClient.java @@ -0,0 +1,131 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.client; + +import java.net.URI; +import java.nio.ByteBuffer; + +import org.onap.policy.apex.core.infrastructure.messaging.MessageListener; +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.RawMessageHandler; +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlockHandler; +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.RawMessageBlock; +import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class InternalMessageBusClient handles the client side of a web socket and handles the callback mechanism used to + * receive messages on the web socket. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + * @param the generic type of message being handled + */ +abstract class InternalMessageBusClient extends WebSocketClientImpl { + private static final int THREAD_FACTORY_STACK_SIZE = 256; + + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(InternalMessageBusClient.class); + + // Name of the event bus. + private static final String RAW_EVENT_BUS = "Raw-Event-Bus"; + + // This instance handles the raw data received from the web socket + private final RawMessageHandler rawMessageHandler = new RawMessageHandler<>(); + + // The message block handler to which to pass messages coming in on this client + private MessageBlockHandler messageBlockHandler = null; + + // The raw message handler uses a thread to process incoming events off a queue, this class owns and controls that + // thread. These fields hold the thread and + // the thread factory for creating threads. + private ApplicationThreadFactory tFactory = + new ApplicationThreadFactory("ws-client-thread", THREAD_FACTORY_STACK_SIZE); + private Thread forwarderThread = null; + + /** + * Construct the class and start the forwarding thread for received messages. + * + * @param serverUri the server URI to connect to + */ + InternalMessageBusClient(final URI serverUri) { + // Call the super class to create the web socket + super(serverUri); + LOGGER.entry(serverUri.toString()); + + // Create the data handler for forwarding messages + messageBlockHandler = new MessageBlockHandler<>(RAW_EVENT_BUS); + messageBlockHandler.registerMessageHandler(rawMessageHandler); + + // Create the thread that manages the queue in the data handler + forwarderThread = tFactory.newThread(rawMessageHandler); + forwarderThread.start(); + + LOGGER.exit(); + } + + /** + * Callback for binary messages received from the remote host. + * + * @param rawMessage the received raw message + * @see org.java_websocket.client.WebSocketClient#onMessage(java.nio.ByteBuffer) + */ + @Override + public void onMessage(final ByteBuffer rawMessage) { + // Post the message to the data handler for forwarding to its listeners + messageBlockHandler.post(new RawMessageBlock(rawMessage, null)); + } + + /** + * Callback for binary messages received from the remote host. + * + * @param stringMessage the string message + * @see org.java_websocket.client.WebSocketClient#onMessage(java.lang.String) + */ + @Override + public final void onMessage(final String stringMessage) { + messageBlockHandler.post(stringMessage); + } + + /** + * Register a subscriber class to the raw message handler. + * + * @param listener a simple class, that listens for the events from Event + */ + public void addMessageListener(final MessageListener listener) { + rawMessageHandler.registerDataForwarder(listener); + } + + /** + * Removes the message listener. + * + * @param listener the listener + */ + public void removeMessageListener(final MessageListener listener) { + rawMessageHandler.unRegisterDataForwarder(listener); + } + + /** + * Stop the thread handling message forwarding. + */ + protected void stopListener() { + rawMessageHandler.shutdown(); + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java new file mode 100644 index 000000000..4a756d6f0 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/MessagingClient.java @@ -0,0 +1,162 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.client; + +import java.net.URI; + +import org.java_websocket.WebSocket; +import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder; +import org.onap.policy.apex.core.infrastructure.messaging.MessagingService; +import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils; +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; + +/** + * The Class MessagingClient is the class that wraps web socket handling, message sending, and message reception on the + * client side of a web socket in Apex. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + * @param the generic type + */ +public class MessagingClient extends InternalMessageBusClient implements MessagingService { + // The length of time to wait for a connection to a web socket server before aborting + private static final int CONNECTION_TIMEOUT_TIME_MS = 3000; + + // The length of time to wait before checking if a connection to a web socket server has worked or not + private static final int CONNECTION_TRY_INTERVAL_MS = 100; + + /** + * Constructor of this class, uses its {@link InternalMessageBusClient} superclass to set up the web socket and + * handle incoming message forwarding. + * + * @param serverUri The URI of the service + */ + public MessagingClient(final URI serverUri) { + // Call the super class to create the web socket and set up received message forwarding + super(serverUri); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#stopConnection() + */ + @Override + public void stopConnection() { + // Stop message reception in the super class + super.stopListener(); + + // Close the web socket + final WebSocket connection = super.getConnection(); + if (connection != null && connection.isOpen()) { + connection.closeConnection(0, ""); + } + this.close(); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#startConnection() + */ + @Override + public void startConnection() { + // Open the web socket + final WebSocket connection = super.getConnection(); + if (connection != null && !connection.isOpen()) { + connect(); + } + + if (!waitforConnection(connection)) { + throw new IllegalStateException("Could not connect to the server"); + } + } + + /** + * This method waits for the timeout value for the client to connect to the web socket server. + * + * @param connection the connection to wait on + * @return true, if successful + */ + private boolean waitforConnection(final WebSocket connection) { + // The total time we have before timeout + int timeoutMSCounter = CONNECTION_TIMEOUT_TIME_MS; + + // Check the connection state + do { + switch (connection.getReadyState()) { + case NOT_YET_CONNECTED: + case CONNECTING: + case CLOSING: + // Not connected yet so wait for the try interval + ThreadUtilities.sleep(CONNECTION_TRY_INTERVAL_MS); + timeoutMSCounter -= CONNECTION_TRY_INTERVAL_MS; + break; + case OPEN: + // Connection is open, happy days + return true; + case CLOSED: + // Connection is closed, bah + return false; + default: + break; + } + } + // While the timeout value has not expired + while (timeoutMSCounter > 0); + + // We have timed out + return false; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex.core. + * infrastructure. messaging.MessageHolder) + */ + @Override + public void send(final MessageHolder commands) { + // Get the connection and send the message + final WebSocket connection = super.getConnection(); + connection.send(MessagingUtils.serializeObject(commands)); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String) + */ + @Override + public void send(final String messageString) { + final WebSocket connection = super.getConnection(); + connection.send(messageString); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#isStarted() + */ + @Override + public boolean isStarted() { + return getConnection().isOpen(); + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/WebSocketClientImpl.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/WebSocketClientImpl.java new file mode 100644 index 000000000..b2e0953c7 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/WebSocketClientImpl.java @@ -0,0 +1,83 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.client; + +import java.net.URI; + +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This class implements {@link WebSocketClient} specific methods in order to act as a Java Web Socket client. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + */ +abstract class WebSocketClientImpl extends WebSocketClient { + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(WebSocketClientImpl.class); + + /** + * Constructs a WebSocketClient instance and sets it to the connect to the specified URI. The channel does not + * attempt to connect automatically. You must call {@link connect} first to initiate the socket connection. + * + * @param serverUri the URI of the web socket server to connect to + */ + WebSocketClientImpl(final URI serverUri) { + super(serverUri); + } + + /* + * (non-Javadoc) + * + * @see org.java_websocket.client.WebSocketClient#onOpen(org.java_websocket.handshake.ServerHandshake) + */ + @Override + public void onOpen(final ServerHandshake handshakedata) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Connection opened to server {} --> {}", this.getURI(), handshakedata.getHttpStatusMessage()); + } + } + + /* + * (non-Javadoc) + * + * @see org.java_websocket.client.WebSocketClient#onClose(int, java.lang.String, boolean) + */ + @Override + public void onClose(final int code, final String reason, final boolean remote) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Connection closed to server {} --> code \"{}\", reason \"{}\"", this.getURI(), code, reason); + } + } + + /* + * (non-Javadoc) + * + * @see org.java_websocket.client.WebSocketClient#onError(java.lang.Exception) + */ + @Override + public void onError(final Exception ex) { + LOGGER.info("Failed to make a connection to the server {} ", getURI()); + LOGGER.catching(ex); + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/package-info.java new file mode 100644 index 000000000..d5344fe14 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/client/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Provides the client side of messaging over web sockets. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + */ + +package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.client; diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlock.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlock.java new file mode 100644 index 000000000..70b1d2c3a --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlock.java @@ -0,0 +1,70 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock; + +import java.util.List; + +import org.java_websocket.WebSocket; + +/** + * This class encapsulate messages and the web socket on which they are handled. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + * @param the generic type of message being handled + */ +public final class MessageBlock { + + // List of Messages received on a web socket + private final List messages; + + // The web socket on which the messages are handled + private final WebSocket webSocket; + + /** + * Instantiates a new message block. + * + * @param messages the messages in the message block + * @param webSocket the web socket used to handle the message block + */ + public MessageBlock(final List messages, final WebSocket webSocket) { + this.messages = messages; + this.webSocket = webSocket; + } + + /** + * Gets the messages. + * + * @return the messages + */ + public List getMessages() { + return messages; + } + + /** + * Gets the web socket. + * + * @return the web socket + */ + public WebSocket getConnection() { + return webSocket; + } + +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlockHandler.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlockHandler.java new file mode 100644 index 000000000..4265718db --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/MessageBlockHandler.java @@ -0,0 +1,128 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock; + +import com.google.common.eventbus.EventBus; + +import org.onap.policy.apex.core.infrastructure.messaging.MessageListener; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This class is used to pass messages received on a Java web socket to listening application class instances using an + * event bus. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + * @param the generic type + */ +public class MessageBlockHandler { + // Logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessageBlockHandler.class); + + /** + * This event bus will forward the events to all of its subscribers. + */ + private EventBus eventBus = null; + + /** + * Instantiates a new data handler. + * + * @param eventBusName the name of the event bus for this message block handler + */ + public MessageBlockHandler(final String eventBusName) { + eventBus = new EventBus(eventBusName); + LOGGER.trace("message bus {} created ", eventBusName); + } + + /** + * Post a raw message block on the data handler event bus of this class. + * + * @param rawMessageBlock the block containing raw messages + */ + public void post(final RawMessageBlock rawMessageBlock) { + if (rawMessageBlock.getMessage() != null) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("new raw message recieved from {}", rawMessageBlock.getConn() == null ? "server" + : rawMessageBlock.getConn().getRemoteSocketAddress().getHostName()); + } + eventBus.post(rawMessageBlock); + } + } + + /** + * Post a block of typed messages on the data handler event bus of this class. + * + * @param messageBlock the block containing typed messages + */ + public void post(final MessageBlock messageBlock) { + if (messageBlock.getMessages() != null) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("new data message recieved from {}", messageBlock.getConnection() == null ? "server" + : messageBlock.getConnection().getRemoteSocketAddress().getHostName()); + } + eventBus.post(messageBlock); + } + } + + /** + * Post a string message on the data handler event bus of this class. + * + * @param messageString the string message + */ + public void post(final String messageString) { + if (messageString != null) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("new string message recieved from server: " + messageString); + } + eventBus.post(messageString); + } + } + + /** + * Register a listener to event bus. + * + * @param listener is an instance of WebSocketMessageListener + */ + public void registerMessageHandler(final MessageListener listener) { + LOGGER.entry(listener); + if (listener == null) { + throw new IllegalArgumentException("listener object cannot be null"); + } + eventBus.register(listener); + LOGGER.debug("message listener {} is registered with forwarder", listener); + LOGGER.exit(); + } + + /** + * Remove the listener subscribed to the event bus. + * + * @param listener the listener + */ + public void unRegisterMessageHandler(final MessageListener listener) { + if (listener == null) { + throw new IllegalArgumentException("listener object cannot be null"); + } + LOGGER.entry(listener); + eventBus.unregister(listener); + LOGGER.trace(" message listener {} unregistered from forwarder", listener); + LOGGER.exit(); + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/RawMessageBlock.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/RawMessageBlock.java new file mode 100644 index 000000000..3fa25b56b --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/RawMessageBlock.java @@ -0,0 +1,67 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock; + +import java.nio.ByteBuffer; + +import org.java_websocket.WebSocket; + +/** + * A container for a raw message block and the connection on which it is handled. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + */ +public final class RawMessageBlock { + // The raw message + private final ByteBuffer message; + + // The web socket on which the message is handled + private final WebSocket webSocket; + + /** + * Constructor, instantiate the bean. + * + * @param message {@link ByteBuffer} message from the web socket + * @param webSocket {@link WebSocket} the web socket on which the message is handled + */ + public RawMessageBlock(final ByteBuffer message, final WebSocket webSocket) { + this.message = message; + this.webSocket = webSocket; + } + + /** + * A getter method for message. + * + * @return the message + */ + public ByteBuffer getMessage() { + return message; + } + + /** + * A getter method for the web socket. + * + * @return the web socket + */ + public WebSocket getConn() { + return webSocket; + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/package-info.java new file mode 100644 index 000000000..c01c88aed --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/messageblock/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Pass blocks of messages on Web Sockets to clients using an event bus. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + */ + +package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock; diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/package-info.java new file mode 100644 index 000000000..c8811817b --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Provides a Web Service implementation of the Messaging interfaces. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + */ + +package org.onap.policy.apex.core.infrastructure.messaging.impl.ws; diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/InternalMessageBusServer.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/InternalMessageBusServer.java new file mode 100644 index 000000000..8e65bbf98 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/InternalMessageBusServer.java @@ -0,0 +1,134 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.server; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +import org.java_websocket.WebSocket; +import org.onap.policy.apex.core.infrastructure.messaging.MessageListener; +import org.onap.policy.apex.core.infrastructure.messaging.MessagingService; +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.RawMessageHandler; +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlockHandler; +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.RawMessageBlock; +import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class InternalMessageBusServer handles the server side of a web socket and handles the callback mechanism used to + * receive messages on the web socket. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + * @param the generic type + */ +abstract class InternalMessageBusServer extends WebSocketServerImpl implements MessagingService { + // Logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(InternalMessageBusServer.class); + + private static final int THREAD_FACTORY_STACK_SIZE = 256; + + // Name of the event bus. + private static final String RAW_EVENT_BUS = "Raw-Event-Bus"; + + // This instance handles the raw data received from the web socket + private final RawMessageHandler rawMessageHandler = new RawMessageHandler<>(); + + // The message block handler to which to pass messages coming in on this client + private MessageBlockHandler messageBlockHandler = null; + + // The raw message handler uses a thread to process incoming events off a queue, this class owns and controls that + // thread. These fields hold the thread and + // the thread factory for creating threads. + private ApplicationThreadFactory tFactory = + new ApplicationThreadFactory("ws-server-thread", THREAD_FACTORY_STACK_SIZE); + private Thread forwarderThread = null; + + /** + * Construct the class and start the forwarding thread for received messages. + * + * @param address the address of the server machine + */ + protected InternalMessageBusServer(final InetSocketAddress address) { + // Call the super class to create the web socket + super(address); + LOGGER.entry(address.getAddress().getHostAddress() + ":" + address.getPort()); + + // Create the data handler for forwarding messages + messageBlockHandler = new MessageBlockHandler<>(RAW_EVENT_BUS); + messageBlockHandler.registerMessageHandler(rawMessageHandler); + + // Create the thread that manages the queue in the data handler + forwarderThread = tFactory.newThread(rawMessageHandler); + forwarderThread.start(); + + LOGGER.exit(); + } + + /** + * Callback for binary messages received from the remote host. + * + * @param webSocket the web socket on which the raw message was received + * @param rawMessage the received raw message + * @see #onMessage(WebSocket, String) + */ + @Override + public void onMessage(final WebSocket webSocket, final ByteBuffer rawMessage) { + messageBlockHandler.post(new RawMessageBlock(rawMessage, webSocket)); + } + + /** + * Register a subscriber class to the raw message handler. + * + * @param subscriber the subscriber + */ + @Override + public void addMessageListener(final MessageListener subscriber) { + rawMessageHandler.registerDataForwarder(subscriber); + } + + /** + * Removes the message listener. + * + * @param subscriber the subscriber + */ + @Override + public void removeMessageListener(final MessageListener subscriber) { + rawMessageHandler.unRegisterDataForwarder(subscriber); + } + + /* + * (non-Javadoc) + * + * @see org.java_websocket.server.WebSocketServer#onMessage(org.java_websocket.WebSocket, java.lang.String) + */ + @Override + public void onMessage(final WebSocket webSocket, final String stringMessage) { + messageBlockHandler.post(stringMessage); + } + + /** + * Stop the thread handling message forwarding. + */ + protected void stopListener() { + rawMessageHandler.shutdown(); + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java new file mode 100644 index 000000000..fc401576f --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/MessageServerImpl.java @@ -0,0 +1,161 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.server; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; + +import org.java_websocket.WebSocket; +import org.onap.policy.apex.core.infrastructure.messaging.MessageHolder; +import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * A messaging server implementation using web socket. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + * @param the generic type of message being passed + */ +public class MessageServerImpl extends InternalMessageBusServer { + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessageServerImpl.class); + + // The Web Socket protocol for URIs and URLs + private static final String PROTOCOL = "ws://"; + + // URI of this server + private final String connectionURI; + + // Indicates if the web socket server is started or not + private boolean isStarted = false; + + /** + * Instantiates a new web socket messaging server for Apex. + * + * @param address the address of the server machine on which to start the server + */ + public MessageServerImpl(final InetSocketAddress address) { + // Call the super class to create the web socket and set up received message forwarding + super(address); + LOGGER.entry(address); + + // Compose the Web Socket URI + connectionURI = PROTOCOL + address.getHostString() + ":" + address.getPort(); + LOGGER.debug("Server connection URI: {}", connectionURI); + + LOGGER.exit(); + } + + /* + * (non-Javadoc) + * + * @see org.java_websocket.server.WebSocketServer#start() + */ + @Override + public void startConnection() { + // Start reception of connections on the web socket + start(); + isStarted = true; + } + + /* + * (non-Javadoc) + * + * @see org.java_websocket.server.WebSocketServer#stop() + */ + @Override + public void stopConnection() { + // Stop message listening using our super class + stopListener(); + + // Stop the web socket server + try { + // Close all connections on this web socket server + for (final WebSocket connection : connections()) { + connection.closeConnection(0, ""); + } + stop(); + } catch (final IOException ioe) { + LOGGER.catching(ioe); + } catch (final InterruptedException e) { + // This can happen in normal operation so ignore + } + isStarted = false; + } + + /** + * This method returns the current connection URI , if the server started otherwise it throws + * {@link IllegalStateException}. + * + * @return connection URI + */ + public String getConnectionURI() { + if (connectionURI == null) { + throw new IllegalStateException("URI not set - The server is not started"); + } + return connectionURI; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(org.onap.policy.apex.core. + * infrastructure. messaging.MessageHolder) + */ + @Override + public void send(final MessageHolder message) { + // Send the incoming message to all clients connected to this web socket + final Collection connections = connections(); + for (final WebSocket webSocket : connections) { + webSocket.send(MessagingUtils.serializeObject(message)); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#send(java.lang.String) + */ + @Override + public void send(final String messageString) { + final Collection connections = connections(); + for (final WebSocket webSocket : connections) { + webSocket.send(messageString); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.MessagingService#isStarted() + */ + @Override + public boolean isStarted() { + return isStarted; + } + + @Override + public void onStart() { + LOGGER.debug("started deployment server on URI: {}", connectionURI); + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/WebSocketServerImpl.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/WebSocketServerImpl.java new file mode 100644 index 000000000..26acfe70c --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/WebSocketServerImpl.java @@ -0,0 +1,89 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.server; + +import java.net.InetSocketAddress; + +import org.java_websocket.WebSocket; +import org.java_websocket.handshake.ClientHandshake; +import org.java_websocket.server.WebSocketServer; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This class is the web socket server specific implementation for Apex. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + */ +abstract class WebSocketServerImpl extends WebSocketServer { + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessageServerImpl.class); + + /** + * Constructor of this class. + * + * @param address host address of the local machine. + */ + protected WebSocketServerImpl(final InetSocketAddress address) { + super(address); + LOGGER.entry(address.getAddress().getHostAddress() + ":" + address.getPort()); + LOGGER.exit(); + } + + /* + * (non-Javadoc) + * + * @see org.java_websocket.server.WebSocketServer#onOpen(org.java_websocket.WebSocket , + * org.java_websocket.handshake.ClientHandshake) + */ + @Override + public void onOpen(final WebSocket conn, final ClientHandshake handshake) { + LOGGER.entry(conn, handshake); + LOGGER.debug("A client connection opened from machine {}.", + conn.getRemoteSocketAddress().getAddress().getHostAddress()); + LOGGER.exit(); + } + + /* + * (non-Javadoc) + * + * @see org.java_websocket.server.WebSocketServer#onClose(org.java_websocket. WebSocket, int, java.lang.String, + * boolean) + */ + @Override + public void onClose(final WebSocket conn, final int code, final String reason, final boolean remote) { + LOGGER.entry(conn, code, remote); + LOGGER.debug("A client connection from machine {} closing with code {}.", + conn.getRemoteSocketAddress().getAddress().getHostAddress(), code); + LOGGER.exit(); + } + + /* + * (non-Javadoc) + * + * @see org.java_websocket.server.WebSocketServer#onError(org.java_websocket.WebSocket, java.lang.Exception) + */ + @Override + public void onError(final WebSocket conn, final Exception ex) { + // some errors like port binding failed may not be assignable to a specific web socket + LOGGER.error("server error occurred", ex); + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/package-info.java new file mode 100644 index 000000000..0a7235b05 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/impl/ws/server/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Provides the server side of messaging over web sockets. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + */ + +package org.onap.policy.apex.core.infrastructure.messaging.impl.ws.server; diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/package-info.java new file mode 100644 index 000000000..adb17da04 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Provides support for passing messages as POJOs and as strings over Web Sockets. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ + +package org.onap.policy.apex.core.infrastructure.messaging; diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageClient.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageClient.java new file mode 100644 index 000000000..00ade8047 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageClient.java @@ -0,0 +1,147 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging.stringmessaging; + +import com.google.common.eventbus.Subscribe; + +import java.net.URI; + +import org.onap.policy.apex.core.infrastructure.messaging.MessageListener; +import org.onap.policy.apex.core.infrastructure.messaging.MessagingException; +import org.onap.policy.apex.core.infrastructure.messaging.MessagingService; +import org.onap.policy.apex.core.infrastructure.messaging.MessagingServiceFactory; +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This class uses a web socket client to send and receive strings over a web socket. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class WSStringMessageClient implements WSStringMessager { + private static final XLogger LOGGER = XLoggerFactory.getXLogger(WSStringMessageClient.class); + + // Message service factory and the message service itself + private final MessagingServiceFactory factory = new MessagingServiceFactory<>(); + private MessagingService service = null; + + // The listener to use for reception of strings + private WSStringMessageListener wsStringMessageListener; + + // Address of the server + private final String host; + private final int port; + private String uriString; + + /** + * Constructor, define the host and port of the server to connect to. + * + * @param host the host of the server + * @param port the port of the server + */ + public WSStringMessageClient(final String host, final int port) { + this.host = host; + this.port = port; + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageSender#start(org.onap.policy. + * apex. core.infrastructure.messaging. stringmessaging.WSStringMessageListener) + */ + @Override + public void start(final WSStringMessageListener newWsStringMessageListener) throws MessagingException { + this.wsStringMessageListener = newWsStringMessageListener; + + uriString = "ws://" + host + ":" + port; + LOGGER.entry("web socket event consumer client to \"" + uriString + "\" starting . . ."); + + try { + service = factory.createClient(new URI(uriString)); + service.addMessageListener(new WSStringMessageClientListener()); + service.startConnection(); + } catch (final Exception e) { + LOGGER.warn("web socket event consumer client to \"" + uriString + "\" start failed", e); + throw new MessagingException("web socket event consumer client to \"" + uriString + "\" start failed", e); + } + + LOGGER.exit("web socket event consumer client to \"" + uriString + "\" started"); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageSender#stop() + */ + @Override + public void stop() { + LOGGER.entry("web socket event consumer client to \"" + uriString + "\" stopping . . ."); + service.stopConnection(); + LOGGER.exit("web socket event consumer client to \"" + uriString + "\" stopped"); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageSender#sendString(java.lang. + * String) + */ + @Override + public void sendString(final String stringMessage) { + service.send(stringMessage); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("message sent to server: " + stringMessage); + } + } + + /** + * The Class WSStringMessageClientListener. + */ + private class WSStringMessageClientListener implements MessageListener { + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap.policy.apex.core. + * infrastructure.messaging.impl.ws.messageblock. MessageBlock) + */ + @Subscribe + @Override + public void onMessage(final MessageBlock messageBlock) { + throw new UnsupportedOperationException("raw messages are not supported on string message clients"); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.String) + */ + @Subscribe + @Override + public void onMessage(final String messageString) { + wsStringMessageListener.receiveString(messageString); + } + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageListener.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageListener.java new file mode 100644 index 000000000..e524b43d7 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageListener.java @@ -0,0 +1,36 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging.stringmessaging; + +/** + * This interface is used to call back the owner of a String Web socket message server or client. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public interface WSStringMessageListener { + + /** + * Receive a string coming off a web socket. + * + * @param stringMessage the string message + */ + void receiveString(String stringMessage); +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageServer.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageServer.java new file mode 100644 index 000000000..4da478f6a --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessageServer.java @@ -0,0 +1,150 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging.stringmessaging; + +import com.google.common.eventbus.Subscribe; + +import java.net.InetAddress; +import java.net.InetSocketAddress; + +import org.onap.policy.apex.core.infrastructure.messaging.MessageListener; +import org.onap.policy.apex.core.infrastructure.messaging.MessagingException; +import org.onap.policy.apex.core.infrastructure.messaging.MessagingService; +import org.onap.policy.apex.core.infrastructure.messaging.MessagingServiceFactory; +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock; +import org.onap.policy.apex.core.infrastructure.messaging.util.MessagingUtils; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This class runs a web socket server for sending and receiving of strings over a web socket. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class WSStringMessageServer implements WSStringMessager { + private static final XLogger LOGGER = XLoggerFactory.getXLogger(WSStringMessageServer.class); + + // Message service factory and the message service itself + private final MessagingServiceFactory factory = new MessagingServiceFactory<>(); + private MessagingService service = null; + + // The listener to use for reception of strings + private WSStringMessageListener wsStringMessageListener; + + // Address of the server + private final int port; + + /** + * Constructor, define the port of the server. + * + * @param port the port of the server + */ + public WSStringMessageServer(final int port) { + this.port = port; + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageSender#start(org.onap.policy. + * apex. core.infrastructure.messaging. stringmessaging.WSStringMessageListener) + */ + @Override + public void start(final WSStringMessageListener newWsStringMessageListener) throws MessagingException { + this.wsStringMessageListener = newWsStringMessageListener; + + LOGGER.entry("web socket event consumer server starting . . ."); + + try { + final InetAddress addrLan = MessagingUtils.getLocalHostLANAddress(); + LOGGER.debug("web socket string message server LAN address=" + addrLan.getHostAddress()); + final InetAddress addr = InetAddress.getLocalHost(); + LOGGER.debug("web socket string message server host address=" + addr.getHostAddress()); + + service = factory.createServer(new InetSocketAddress(port)); + service.addMessageListener(new WSStringMessageServerListener()); + + service.startConnection(); + } catch (final Exception e) { + LOGGER.warn("web socket string message server start failed", e); + throw new MessagingException("web socket string message start failed", e); + } + + LOGGER.exit("web socket string message server started"); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageSender#stop() + */ + @Override + public void stop() { + LOGGER.entry("web socket string message server stopping . . ."); + service.stopConnection(); + LOGGER.exit("web socket string message server stopped"); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageSender#sendString(java.lang. + * String) + */ + @Override + public void sendString(final String stringMessage) { + service.send(stringMessage); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("server sent message: " + stringMessage); + } + } + + /** + * The listener for strings coming into the server. + */ + private class WSStringMessageServerListener implements MessageListener { + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap.policy.apex.core. + * infrastructure.messaging.impl.ws.messageblock. MessageBlock) + */ + @Subscribe + @Override + public void onMessage(final MessageBlock messageBlock) { + throw new UnsupportedOperationException("raw messages are not supported on string message clients"); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(java.lang.String) + */ + @Subscribe + @Override + public void onMessage(final String messageString) { + wsStringMessageListener.receiveString(messageString); + } + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessager.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessager.java new file mode 100644 index 000000000..a2781e932 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/WSStringMessager.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging.stringmessaging; + +import org.onap.policy.apex.core.infrastructure.messaging.MessagingException; + +/** + * This interface is used to call a String Web socket message server or client to send a string. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public interface WSStringMessager { + + /** + * Start the string message sender. + * + * @param wsStringMessageListener the listener to use for listening for string messages + * @throws MessagingException the messaging exception + */ + void start(WSStringMessageListener wsStringMessageListener) throws MessagingException; + + /** + * Stop the string messaging sender. + */ + void stop(); + + /** + * Send a string on a web socket. + * + * @param stringMessage the string message to send + */ + void sendString(String stringMessage); +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/package-info.java new file mode 100644 index 000000000..a8e679c70 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/stringmessaging/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Provides string messaging over Web Sockets. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ + +package org.onap.policy.apex.core.infrastructure.messaging.stringmessaging; diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java new file mode 100644 index 000000000..d15f86c8a --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/MessagingUtils.java @@ -0,0 +1,260 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging.util; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.Socket; +import java.net.UnknownHostException; +import java.util.Enumeration; + +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class MessagingUtils is a class with static methods used in IPC messaging for finding free ports, translating + * host names to addresses, serializing objects and flushing object streams. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + */ +public final class MessagingUtils { + // The port number of the lowest user port, ports 0-1023 are system ports + private static final int LOWEST_USER_PORT = 1024; + + // Logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(MessagingUtils.class); + + /** + * Private constructor used to prevent sub class instantiation. + */ + private MessagingUtils() {} + + /** + * This method searches the availability of the port, if the requested port not available, this method will throw an + * exception. + * + * @param port the port to check + * @return the port verified as being free + * @throws RuntimeException on port allocation errors + */ + public static int checkPort(final int port) { + LOGGER.entry("Checking availability of port {}", port); + + Socket s = null; + try { + // Try to connect to the port, if we can connect then the port is occupied + s = new Socket("localhost", port); + LOGGER.debug("Port {} is not available", port); + + throw new RuntimeException("could not allocate requested port: " + port); + } catch (final IOException e) { + // We found a free port + LOGGER.debug("Port {} is available ", port); + return port; + } finally { + // Close the socket used to check if the port was free + if (s != null) { + try { + s.close(); + } catch (final IOException e) { + LOGGER.catching(e); + LOGGER.warn("could not allocate requested port " + port, e); + } + } + } + } + + /** + * This method searches the availability of the port, if the requested port not available,this method will increment + * the port number and check the availability of that port, this process will continue until it find port available. + * + * @param port the first port to check + * @return the port that was found + * @throws RuntimeException on port allocation errors + */ + public static int findPort(final int port) { + LOGGER.entry("Checking availability of port {}", port); + + Socket s = null; + try { + // Try to connect to the port, if we can connect then the port is occupied + s = new Socket("localhost", port); + LOGGER.debug("Port {} is not available", port); + + // Recurse and try the next port + return findPort(port + 1); + } catch (final IOException e) { + // We found a free port + LOGGER.debug("Port {} is available ", port); + return port; + } finally { + // Close the socket used to check if the port was free + if (s != null) { + try { + s.close(); + } catch (final IOException e) { + LOGGER.catching(e); + LOGGER.warn("could not allocate requested port " + port, e); + throw new RuntimeException("could not allocate requested port " + port, e); + } + } + } + } + + /** + * Returns the local host address. + * + * @return the local host address + */ + public static InetAddress getHost() { + try { + return InetAddress.getLocalHost(); + } catch (final UnknownHostException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + /** + * This method searches the availability of the port, if the requested port not available,this method will increment + * the port number and check the availability, this process will continue until it find port available. + * + * @param port the first port to check + * @return the port that was found + * @throws RuntimeException on port allocation errors + */ + public static int allocateAddress(final int port) { + if (port < LOWEST_USER_PORT) { + throw new IllegalArgumentException("The port " + port + " is already in use"); + } + return MessagingUtils.findPort(port); + } + + /** + * Get an Internet Address for the local host. + * + * @return an Internet address + * @throws UnknownHostException if the address of the local host cannot be found + */ + public static InetAddress getLocalHostLANAddress() throws UnknownHostException { + try { + InetAddress candidateAddress = null; + // Iterate all NICs (network interface cards)... + for (final Enumeration ifaces = NetworkInterface.getNetworkInterfaces(); ifaces + .hasMoreElements();) { + final NetworkInterface iface = ifaces.nextElement(); + // Iterate all IP addresses assigned to each card... + for (final Enumeration inetAddrs = iface.getInetAddresses(); inetAddrs + .hasMoreElements();) { + final InetAddress inetAddr = inetAddrs.nextElement(); + if (!inetAddr.isLoopbackAddress()) { + + if (inetAddr.isSiteLocalAddress()) { + // Found non-loopback site-local address. Return it + // immediately... + return inetAddr; + } else if (candidateAddress == null) { + // Found non-loopback address, but not + // necessarily site-local. + // Store it as a candidate to be returned if + // site-local address is not subsequently + // found... + candidateAddress = inetAddr; + // Note that we don't repeatedly assign + // non-loopback non-site-local addresses as + // candidates, + // only the first. For subsequent iterations, + // candidate will be non-null. + } + } + } + } + if (candidateAddress != null) { + // We did not find a site-local address, but we found some other + // non-loopback address. + // Server might have a non-site-local address assigned to its + // NIC (or it might be running + // IPv6 which deprecates the "site-local" concept). + // Return this non-loopback candidate address... + return candidateAddress; + } + // At this point, we did not find a non-loopback address. + // Fall back to returning whatever InetAddress.getLocalHost() + // returns... + final InetAddress jdkSuppliedAddress = InetAddress.getLocalHost(); + if (jdkSuppliedAddress == null) { + throw new UnknownHostException("The JDK InetAddress.getLocalHost() method unexpectedly returned null."); + } + return jdkSuppliedAddress; + } catch (final Exception e) { + final UnknownHostException unknownHostException = + new UnknownHostException("Failed to determine LAN address: " + e); + unknownHostException.initCause(e); + throw unknownHostException; + } + } + + /** + * This method serializes the message holder objects. + * + * @param object the object + * @return byte[] + */ + public static byte[] serializeObject(final Object object) { + LOGGER.entry(object.getClass().getName()); + final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + ObjectOutputStream oos = null; + try { + oos = new ObjectOutputStream(bytesOut); + oos.writeObject(object); + } catch (final IOException e) { + LOGGER.warn("error on object serialization", e); + } finally { + flushAndClose(oos, bytesOut); + } + final byte[] bytes = bytesOut.toByteArray(); + return bytes; + } + + /** + * Flush and close an object stream and a byte array output stream. + * + * @param oos the object output stream + * @param bytesOut the byte array output stream + */ + private static void flushAndClose(final ObjectOutputStream oos, final ByteArrayOutputStream bytesOut) { + try { + if (oos != null) { + oos.flush(); + oos.close(); + } + if (bytesOut != null) { + bytesOut.close(); + } + + } catch (final IOException e) { + LOGGER.error("Failed to close the Srialization operation"); + LOGGER.catching(e); + } + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/package-info.java new file mode 100644 index 000000000..ccb12f2dd --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/messaging/util/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Contains utility classes for messaging using sockets and web sockets. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ + +package org.onap.policy.apex.core.infrastructure.messaging.util; diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/package-info.java new file mode 100644 index 000000000..8bd0a093f --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Provides infrastructure and utility functions for use by other classes and modules in APEX. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ + +package org.onap.policy.apex.core.infrastructure; diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ApplicationThreadFactory.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ApplicationThreadFactory.java new file mode 100644 index 000000000..45579c7ba --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ApplicationThreadFactory.java @@ -0,0 +1,142 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.threading; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class provides a thread factory for use by classes that require thread factories to handle concurrent operation. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + */ +public class ApplicationThreadFactory implements ThreadFactory { + private static final String HYPHEN = "-"; + private static final String APPLICATION_NAME = "Apex-"; + private static final AtomicInteger NEXT_POOL_NUMBER = new AtomicInteger(); + private final ThreadGroup group; + private final AtomicInteger nextThreadNumber = new AtomicInteger(); + private final String name; + private final long stackSize; + private final int threadPriority; + + /** + * Instantiates a new application thread factory with a default stack size and normal thread priority. + * + * @param nameLocal the name local + */ + public ApplicationThreadFactory(final String nameLocal) { + this(nameLocal, 0); + } + + /** + * Instantiates a new application thread factory with a default normal thread priority. + * + * @param nameLocal the name local + * @param stackSize the stack size + */ + public ApplicationThreadFactory(final String nameLocal, final long stackSize) { + this(nameLocal, stackSize, Thread.NORM_PRIORITY); + } + + /** + * Instantiates a new application thread factory with a specified thread priority. + * + * @param nameLocal the name local + * @param stackSize the stack size + * @param threadPriority the thread priority + */ + public ApplicationThreadFactory(final String nameLocal, final long stackSize, final int threadPriority) { + final SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + name = APPLICATION_NAME + nameLocal + HYPHEN + NEXT_POOL_NUMBER.getAndIncrement(); + this.stackSize = stackSize; + this.threadPriority = threadPriority; + } + + /* + * (non-Javadoc) + * + * @see java.util.concurrent.ThreadFactory#newThread(java.lang.Runnable) + */ + @Override + public Thread newThread(final Runnable r) { + final Thread thisThread; + if (stackSize > 0) { + thisThread = new Thread(group, r, name + ':' + nextThreadNumber.getAndIncrement(), stackSize); + } else { + thisThread = new Thread(group, r, name + ':' + nextThreadNumber.getAndIncrement()); + } + if (thisThread.isDaemon()) { + thisThread.setDaemon(false); + } + thisThread.setPriority(threadPriority); + + return thisThread; + } + + /** + * Stop group threads. + */ + public void stopGroupThreads() { + group.interrupt(); + group.list(); + + } + + /** + * Gets the name of the thread factory. + * + * @return the name + */ + public String getName() { + return name; + } + + /** + * Gets the stack size of the threads created by this thread factory. + * + * @return the stack size + */ + public long getStackSize() { + return stackSize; + } + + /** + * Gets the thread priority of the threads created by this thread factory. + * + * @return the thread priority + */ + public int getThreadPriority() { + return threadPriority; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return "ApplicationThreadFactory [nextPollNumber=" + NEXT_POOL_NUMBER + ",nextThreadNumber=" + nextThreadNumber + + ", name=" + name + ", stackSize=" + stackSize + ", threadPriority=" + threadPriority + "]"; + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java new file mode 100644 index 000000000..56b903f38 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/ThreadUtilities.java @@ -0,0 +1,50 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.threading; + +/** + * This class is a helper class for carrying out common threading tasks. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public final class ThreadUtilities { + + /** + * Private constructor to prevent sub-classing of this class. + */ + private ThreadUtilities() {} + + /** + * Sleeps for the specified number of milliseconds, hiding interrupt handling. + * + * @param milliseconds the milliseconds + * @return true, if successful + */ + public static boolean sleep(final long milliseconds) { + try { + Thread.sleep(milliseconds); + } catch (final InterruptedException e) { + return false; + } + + return true; + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/package-info.java new file mode 100644 index 000000000..dc0b9ee40 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/threading/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Provides factories and utility functions for threads. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ + +package org.onap.policy.apex.core.infrastructure.threading; diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/xml/XPathReader.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/xml/XPathReader.java new file mode 100644 index 000000000..f677e6b4a --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/xml/XPathReader.java @@ -0,0 +1,115 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.xml; + +import java.io.InputStream; + +import javax.xml.namespace.QName; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathExpression; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; + +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; +import org.w3c.dom.Document; + +/** + * A generic class for applying the XPATH queries on XML files. + * + * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com) + */ +public class XPathReader { + // Logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(XPathReader.class); + + private String xmlFileName = null; + private InputStream xmlStream = null; + private Document xmlDocument; + private XPath xPath; + + /** + * Construct Reader for the file passed in. + * + * @param xmlFileName the xml file name + */ + public XPathReader(final String xmlFileName) { + this.xmlFileName = xmlFileName; + init(); + } + + /** + * Construct Reader for the stream passed in. + * + * @param xmlStream a stream of XML + */ + public XPathReader(final InputStream xmlStream) { + this.xmlStream = xmlStream; + init(); + } + + /** + * Initialise the x-path reader. + */ + private void init() { + try { + LOGGER.info("Initializing XPath reader"); + + // Check if this is operating on a file + if (xmlFileName != null) { + xmlDocument = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(xmlFileName); + } + // Check if this is operating on a stream + else if (xmlStream != null) { + xmlDocument = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(xmlStream); + + } + // We have an error + else { + LOGGER.error("XPath reader not initialized with either a file or a stream"); + return; + } + + xPath = XPathFactory.newInstance().newXPath(); + LOGGER.info("Initialized XPath reader"); + } catch (final Exception ex) { + LOGGER.error("Error parsing XML file/stream from XPath reading, reason :\n" + ex.getMessage()); + } + } + + /** + * Read items from the file using xpath. + * + * @param expression x-path expression + * @param returnType XML node Set + * @return last node collected + */ + public Object read(final String expression, final QName returnType) { + try { + final XPathExpression xPathExpression = xPath.compile(expression); + return xPathExpression.evaluate(xmlDocument, returnType); + } catch (final XPathExpressionException ex) { + LOGGER.error("Failed to read XML file for XPath processing, reason:\n" + ex.getMessage()); + return null; + } + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/xml/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/xml/package-info.java new file mode 100644 index 000000000..5631bd15c --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/apex/core/infrastructure/xml/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Provides utility XML classes for use by other classes and modules in APEX. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ + +package org.onap.policy.apex.core.infrastructure.xml; diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassBuilder.java b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassBuilder.java new file mode 100644 index 000000000..5765eee01 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassBuilder.java @@ -0,0 +1,133 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.core.infrastructure.java.compile.singleclass; + +import java.util.Arrays; +import java.util.List; + +import javax.tools.Diagnostic; +import javax.tools.DiagnosticCollector; +import javax.tools.JavaCompiler; +import javax.tools.JavaFileObject; +import javax.tools.ToolProvider; + +import org.onap.policy.apex.core.infrastructure.java.JavaHandlingException; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class SingleClassBuilder is used to compile the Java code for a Java object and to create an instance of the + * object. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class SingleClassBuilder { + // Logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(SingleClassBuilder.class); + + // The class name and source code for the class that we are compiling and instantiating + private final String className; + private final String sourceCode; + + // This specialized JavaFileManager handles class loading for the single Java class + private SingleFileManager singleFileManager = null; + + /** + * Instantiates a new single class builder. + * + * @param className the class name + * @param sourceCode the source code + */ + public SingleClassBuilder(final String className, final String sourceCode) { + // Save the fields of the class + this.className = className; + this.sourceCode = sourceCode; + } + + /** + * Compile the single class into byte code. + * + * @throws JavaHandlingException Thrown on compilation errors or handling errors on the single Java class + */ + public void compile() throws JavaHandlingException { + // Get the list of compilation units, there is only one here + final List compilationUnits = + Arrays.asList(new SingleClassCompilationUnit(className, sourceCode)); + + // Allows us to get diagnostics from the compilation + final DiagnosticCollector diagnosticListener = new DiagnosticCollector<>(); + + // Get the Java compiler + final JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + + // Set up the target file manager and call the compiler + singleFileManager = new SingleFileManager(compiler, new SingleClassByteCodeFileObject(className)); + final JavaCompiler.CompilationTask task = + compiler.getTask(null, singleFileManager, diagnosticListener, null, null, compilationUnits); + + // Check if the compilation worked + if (!task.call()) { + final StringBuilder builder = new StringBuilder(); + for (final Diagnostic diagnostic : diagnosticListener.getDiagnostics()) { + builder.append("code:"); + builder.append(diagnostic.getCode()); + builder.append(", kind:"); + builder.append(diagnostic.getKind()); + builder.append(", position:"); + builder.append(diagnostic.getPosition()); + builder.append(", start position:"); + builder.append(diagnostic.getStartPosition()); + builder.append(", end position:"); + builder.append(diagnostic.getEndPosition()); + builder.append(", source:"); + builder.append(diagnostic.getSource()); + builder.append(", message:"); + builder.append(diagnostic.getMessage(null)); + builder.append("\n"); + } + + LOGGER.warn("error compiling Java code for class \"" + className + "\": " + builder.toString()); + throw new JavaHandlingException( + "error compiling Java code for class \"" + className + "\": " + builder.toString()); + } + } + + /** + * Create a new instance of the Java class using its byte code definition. + * + * @return A new instance of the object + * @throws InstantiationException if an instance of the object cannot be created, for example if the class has no + * default constructor + * @throws IllegalAccessException the caller does not have permission to call the class + * @throws ClassNotFoundException the byte code for the class is not found in the class loader + * @throws JavaHandlingException the java handling exception if the Java class source code is not compiled + */ + public Object createObject() + throws InstantiationException, IllegalAccessException, ClassNotFoundException, JavaHandlingException { + if (singleFileManager == null) { + LOGGER.warn("error instantiating instance for class \"" + className + "\": code may not be compiled"); + throw new JavaHandlingException( + "error instantiating instance for class \"" + className + "\": code may not be compiled"); + } + + return singleFileManager.getClassLoader(null).findClass(className).newInstance(); + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassByteCodeFileObject.java b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassByteCodeFileObject.java new file mode 100644 index 000000000..c0a2ef7a1 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassByteCodeFileObject.java @@ -0,0 +1,89 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.core.infrastructure.java.compile.singleclass; + +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; + +import javax.tools.SimpleJavaFileObject; + +/** + * The Class SingleClassByteCodeFileObject is a specialization of the {@link SimpleJavaFileObject} class, which is + * itself an implementation of the {@code JavaFileObject} interface, which provides a file abstraction for tools + * operating on Java programming language source and class files. The {@link SimpleJavaFileObject} class provides simple + * implementations for most methods in {@code JavaFileObject}. This class is designed to be sub classed and used as a + * basis for {@code JavaFileObject} implementations. Subclasses can override the implementation and specification of any + * method of this class as long as the general contract of {@code JavaFileObject} is obeyed. + * + * This class holds the byte code for a single class in memory. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class SingleClassByteCodeFileObject extends SimpleJavaFileObject { + + // The ByteArrayOutputStream holds the byte code for the class + private ByteArrayOutputStream byteArrayOutputStream; + + /** + * Instantiates the byte code for the class in memory. + * + * @param className the class name is used to compose a URI for the class + */ + public SingleClassByteCodeFileObject(final String className) { + super(URI.create("byte:///" + className + ".class"), Kind.CLASS); + } + + /* + * (non-Javadoc) + * + * @see javax.tools.SimpleJavaFileObject#openOutputStream() + */ + @Override + public OutputStream openOutputStream() { + // Create the byte array output stream that will hold the byte code for the class, when the class source code is + // compiled, this output stream is passed + // to the compiler and the byte code for the class is written into the output stream. + byteArrayOutputStream = new ByteArrayOutputStream(); + return byteArrayOutputStream; + } + + /* + * (non-Javadoc) + * + * @see javax.tools.SimpleJavaFileObject#openInputStream() + */ + @Override + public InputStream openInputStream() { + // No input stream for streaming out the byte code + return null; + } + + /** + * Gets the byte code of the class. + * + * @return the byte code of the class + */ + public byte[] getByteCode() { + return byteArrayOutputStream.toByteArray(); + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassCompilationUnit.java b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassCompilationUnit.java new file mode 100644 index 000000000..9a001d2c2 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassCompilationUnit.java @@ -0,0 +1,83 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.core.infrastructure.java.compile.singleclass; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; + +import javax.tools.SimpleJavaFileObject; + +/** + * The Class SingleClassCompilationUnit is a container for the source code of the single Java class in memory. The class + * uses a {@link String} to hold the source code. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class SingleClassCompilationUnit extends SimpleJavaFileObject { + + private final String source; + + /** + * Instantiates a new compilation unit. + * + * @param className the class name for the source code + * @param source the source code for the class + */ + public SingleClassCompilationUnit(final String className, final String source) { + // Create a URI for the source code of the class + super(URI.create("file:///" + className + ".java"), Kind.SOURCE); + this.source = source; + } + + /* + * (non-Javadoc) + * + * @see javax.tools.SimpleJavaFileObject#getCharContent(boolean) + */ + @Override + public CharSequence getCharContent(final boolean ignoreEncodingErrors) { + // Return the source code to toe caller, the compiler + return source; + } + + /* + * (non-Javadoc) + * + * @see javax.tools.SimpleJavaFileObject#openOutputStream() + */ + @Override + public OutputStream openOutputStream() { + throw new IllegalStateException(); + } + + /* + * (non-Javadoc) + * + * @see javax.tools.SimpleJavaFileObject#openInputStream() + */ + @Override + public InputStream openInputStream() { + // Return the source code as a stream + return new ByteArrayInputStream(source.getBytes()); + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassLoader.java b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassLoader.java new file mode 100644 index 000000000..befed6d40 --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleClassLoader.java @@ -0,0 +1,61 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.core.infrastructure.java.compile.singleclass; + +/** + * The Class SingleClassLoader is responsible for class loading the single Java class being held in memory. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class SingleClassLoader extends ClassLoader { + // The byte code of the class held in memory as byte code in a ByteCodeFileObject + private final SingleClassByteCodeFileObject byteCodeFileObject; + + /** + * Instantiates a new single class loader to load the byte code of the class that is being held in memory. + * + * @param byteCodeFileObject the byte code of the class + */ + public SingleClassLoader(final SingleClassByteCodeFileObject byteCodeFileObject) { + this.byteCodeFileObject = byteCodeFileObject; + } + + /* + * (non-Javadoc) + * + * @see java.lang.ClassLoader#findClass(java.lang.String) + */ + @Override + protected Class findClass(final String className) throws ClassNotFoundException { + // Creates a java Class that can be instantiated from the class defined in the byte code in the + // ByteCodeFileObejct + return defineClass(className, byteCodeFileObject.getByteCode(), 0, byteCodeFileObject.getByteCode().length); + } + + /** + * Gets the file object. + * + * @return the file object + */ + SingleClassByteCodeFileObject getFileObject() { + return byteCodeFileObject; + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleFileManager.java b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleFileManager.java new file mode 100644 index 000000000..ce92f8e4d --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/SingleFileManager.java @@ -0,0 +1,79 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.core.infrastructure.java.compile.singleclass; + +import java.io.IOException; + +import javax.tools.FileObject; +import javax.tools.ForwardingJavaFileManager; +import javax.tools.JavaCompiler; +import javax.tools.JavaFileObject; +import javax.tools.StandardJavaFileManager; + +/** + * The Class SingleFileManager is a {@link ForwardingJavaFileManager} which in turn implements {@code JavaFileManager}. + * A {@code JavaFileManager} handles source files for Java language handling tools. A {@link ForwardingJavaFileManager} + * is an implementation of {@code JavaFileManager} that forwards the {@code JavaFileManager} methods to a given file + * manager. + * + * This class instantiates and forwards those requests to a {@link StandardJavaFileManager} instance to act as a + * {@code JavaFileManager} for a Java single file, managing class loading for the class. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class SingleFileManager extends ForwardingJavaFileManager { + // THe class loader for our single class + private final SingleClassLoader singleClassLoader; + + /** + * Instantiates a new single file manager. + * + * @param compiler the compiler we are using + * @param byteCodeFileObject the byte code for the compiled class + */ + public SingleFileManager(final JavaCompiler compiler, final SingleClassByteCodeFileObject byteCodeFileObject) { + super(compiler.getStandardFileManager(null, null, null)); + singleClassLoader = new SingleClassLoader(byteCodeFileObject); + } + + /* + * (non-Javadoc) + * + * @see javax.tools.ForwardingJavaFileManager#getJavaFileForOutput(javax.tools.JavaFileManager.Location, + * java.lang.String, javax.tools.JavaFileObject.Kind, javax.tools.FileObject) + */ + @Override + public JavaFileObject getJavaFileForOutput(final Location notUsed, final String className, + final JavaFileObject.Kind kind, final FileObject sibling) throws IOException { + // Return the JavaFileObject to the compiler so that it can write byte code into it + return singleClassLoader.getFileObject(); + } + + /* + * (non-Javadoc) + * + * @see javax.tools.ForwardingJavaFileManager#getClassLoader(javax.tools.JavaFileManager.Location) + */ + @Override + public SingleClassLoader getClassLoader(final Location location) { + return singleClassLoader; + } +} diff --git a/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/package-info.java b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/package-info.java new file mode 100644 index 000000000..5e436818c --- /dev/null +++ b/core/core-infrastructure/src/main/java/org/onap/policy/core/infrastructure/java/compile/singleclass/package-info.java @@ -0,0 +1,28 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Generates classes from source code by compiling source code and placing the resultant classes on the class path on + * the fly. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ + +package org.onap.policy.core.infrastructure.java.compile.singleclass; diff --git a/core/core-infrastructure/src/main/resources/logback.xml b/core/core-infrastructure/src/main/resources/logback.xml new file mode 100644 index 000000000..b1d855419 --- /dev/null +++ b/core/core-infrastructure/src/main/resources/logback.xml @@ -0,0 +1,50 @@ + + + + + + Apex + + + + + + + %d %contextName [%t] %level %logger{36} - %msg%n + + + + + + + + + ${VAR_LOG}/apex.log + + %d %-5relative [procId=${processId}] [%thread] %-5level + %logger{26} - %msg %n %ex{full} + + + + + + + diff --git a/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/EndToEndStringMessagingTest.java b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/EndToEndStringMessagingTest.java new file mode 100644 index 000000000..e18c327c0 --- /dev/null +++ b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/EndToEndStringMessagingTest.java @@ -0,0 +1,87 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.junit.Test; +import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageClient; +import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageListener; +import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageServer; +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class EndToEndMessagingTest. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class EndToEndStringMessagingTest { + // Logger for this class + private static final XLogger logger = XLoggerFactory.getXLogger(EndToEndStringMessagingTest.class); + + private WSStringMessageServer server; + private WSStringMessageClient client; + + private boolean finished = false; + + @Test + public void testEndToEndMessaging() throws MessagingException { + logger.debug("end to end messaging test starting . . ."); + server = new WSStringMessageServer(44441); + assertNotNull(server); + server.start(new WSStringServerMessageListener()); + + client = new WSStringMessageClient("localhost", 44441); + assertNotNull(client); + client.start(new WSStringClientMessageListener()); + + client.sendString("Hello, client here"); + + while (!finished) { + ThreadUtilities.sleep(50); + } + client.stop(); + + server.stop(); + logger.debug("end to end messaging test finished"); + } + + private class WSStringServerMessageListener implements WSStringMessageListener { + @Override + public void receiveString(final String stringMessage) { + logger.debug(stringMessage); + assertEquals("Hello, client here", stringMessage); + server.sendString("Hello back from server"); + } + } + + private class WSStringClientMessageListener implements WSStringMessageListener { + @Override + public void receiveString(final String stringMessage) { + logger.debug(stringMessage); + assertEquals("Hello back from server", stringMessage); + finished = true; + } + } +} diff --git a/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/StringTestServer.java b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/StringTestServer.java new file mode 100644 index 000000000..09fa62d59 --- /dev/null +++ b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/StringTestServer.java @@ -0,0 +1,83 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging; + +import static org.junit.Assert.assertNotNull; + +import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageListener; +import org.onap.policy.apex.core.infrastructure.messaging.stringmessaging.WSStringMessageServer; +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; + +public class StringTestServer { + private WSStringMessageServer server; + + public StringTestServer(final int port, long timeToLive) throws MessagingException { + System.out.println("StringTestServer starting on port " + port + " for " + timeToLive + " seconds . . ."); + server = new WSStringMessageServer(port); + assertNotNull(server); + server.start(new WSStringServerMessageListener()); + + System.out.println("StringTestServer started on port " + port + " for " + timeToLive + " seconds"); + + for (; timeToLive > 0; timeToLive--) { + ThreadUtilities.sleep(1000); + } + + server.stop(); + System.out.println("StringTestServer completed"); + } + + private class WSStringServerMessageListener implements WSStringMessageListener { + @Override + public void receiveString(final String stringMessage) { + System.out.println("Server received string \"" + stringMessage + "\""); + server.sendString("Server echoing back the message: \"" + stringMessage + "\""); + } + } + + public static void main(final String[] args) throws MessagingException { + if (args.length != 2) { + System.err.println("Usage: StringTestServer port timeToLive"); + return; + } + + int port = 0; + try { + port = Integer.parseInt(args[0]); + } catch (final Exception e) { + System.err.println("Usage: StringTestServer port timeToLive"); + e.printStackTrace(); + return; + } + + long timeToLive = 0; + try { + timeToLive = Long.parseLong(args[1]); + } catch (final Exception e) { + System.err.println("Usage: StringTestServer port timeToLive"); + e.printStackTrace(); + return; + } + + new StringTestServer(port, timeToLive); + + } +} diff --git a/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/TestMessageListener.java b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/TestMessageListener.java new file mode 100644 index 000000000..9e7562b5e --- /dev/null +++ b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/messaging/TestMessageListener.java @@ -0,0 +1,65 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.messaging; + +import com.google.common.eventbus.Subscribe; + +import org.onap.policy.apex.core.infrastructure.messaging.impl.ws.messageblock.MessageBlock; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The listener interface for receiving testMessage events. The class that is interested in processing a testMessage + * event implements this interface, and the object created with that class is registered with a component using the + * component's addTestMessageListener method. When the testMessage event occurs, that object's appropriate + * method is invoked. + * + */ +public abstract class TestMessageListener implements MessageListener { + + /** The Constant logger. */ + private static final XLogger logger = XLoggerFactory.getXLogger(TestMessageListener.class); + + /** + * On command. + * + * @param data the data + */ + public abstract void onCommand(MessageBlock data); + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.core.infrastructure.messaging.MessageListener#onMessage(org.onap.policy.apex.core. + * infrastructure. messaging.impl.ws.data.Data) + */ + @Subscribe + @Override + public final void onMessage(final MessageBlock data) { + if (data != null) { + if (logger.isDebugEnabled()) { + logger.debug("{} command recieved from machine {} ", data.getMessages().size(), + data.getConnection().getRemoteSocketAddress().getHostString()); + } + onCommand(data); + } + } +} diff --git a/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/threading/ThreadingTest.java b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/threading/ThreadingTest.java new file mode 100644 index 000000000..e570ae0c7 --- /dev/null +++ b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/threading/ThreadingTest.java @@ -0,0 +1,92 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.threading; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class ThreadingTest. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ThreadingTest { + + // Logger for this class + private static final XLogger logger = XLoggerFactory.getXLogger(ThreadingTest.class); + + /** + * Test thread factory initialization. + */ + @Test + public void testThreadFactoryInitialization() { + final ApplicationThreadFactory threadFactory0 = new ApplicationThreadFactory("localName", 0); + assertNotNull("Failed to create ApplicationThreadFactory threadFactory0", threadFactory0); + logger.debug(threadFactory0.toString()); + assertTrue("Failed to name ApplicationThreadFactory threadFactory0", + threadFactory0.getName().startsWith("Apex-localName")); + final ApplicationThreadFactory threadFactory1 = new ApplicationThreadFactory("localName", 0); + assertNotNull("Failed to create ApplicationThreadFactory threadFactory1", threadFactory1); + logger.debug(threadFactory1.toString()); + assertTrue("Failed to name ApplicationThreadFactory threadFactory1", + threadFactory1.getName().startsWith("Apex-localName")); + + testThreadFactory(threadFactory0, 0); + testThreadFactory(threadFactory1, 1); + } + + /** + * Test thread factory. + * + * @param threadFactory the thread factory + * @param factoryId the factory id + */ + private void testThreadFactory(final ApplicationThreadFactory threadFactory, final int factoryId) { + final List threadList = new ArrayList(); + + for (int i = 0; i < 5; i++) { + threadList.add(new ThreadingTestThread()); + threadList.get(i).setThread(threadFactory.newThread(threadList.get(i))); + assertTrue(threadList.get(i).getName().startsWith("Apex-localName")); + assertTrue(threadList.get(i).getName().contains(":" + i)); + threadList.get(i).getThread().start(); + } + + // Threads should need a little more than 300ms to count to 3 + ThreadUtilities.sleep(380); + + for (int i = 0; i < 5; i++) { + threadList.get(i).interrupt(); + } + + for (int i = 0; i < 5; i++) { + assertTrue("Thread (" + i + ") failed to get count (" + threadList.get(i).getCounter() + ") up to 3", + threadList.get(i).getCounter() == 3); + } + } +} diff --git a/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/threading/ThreadingTestThread.java b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/threading/ThreadingTestThread.java new file mode 100644 index 000000000..195072886 --- /dev/null +++ b/core/core-infrastructure/src/test/java/org/onap/policy/apex/core/infrastructure/threading/ThreadingTestThread.java @@ -0,0 +1,110 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.core.infrastructure.threading; + +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class ThreadingTestThread. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ThreadingTestThread implements Runnable { + + // Logger for this class + private static final XLogger logger = XLoggerFactory.getXLogger(ThreadingTestThread.class); + + private boolean interrupted = false; + + private long counter = -1; + + private Thread thread = null; + + /** + * Sets the thread. + * + * @param thread the new thread + */ + public void setThread(final Thread thread) { + this.thread = thread; + } + + /** + * Gets the thread. + * + * @return the thread + */ + public Thread getThread() { + return thread; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + if (logger.isDebugEnabled()) { + logger.debug("starting threading test thread \"" + thread.getName() + "\" . . ."); + } + + while (!interrupted) { + counter++; + if (logger.isDebugEnabled()) { + logger.debug("in threading test thread \"" + thread.getName() + "\", counter=" + counter + " . . ."); + } + + if (!ThreadUtilities.sleep(100)) { + interrupted = true; + } + } + if (logger.isDebugEnabled()) { + logger.debug("stopped threading test thread \"" + thread.getName() + "\""); + } + } + + /** + * Gets the name. + * + * @return the name + */ + public String getName() { + return thread.getName(); + } + + /** + * Interrupt. + */ + public void interrupt() { + interrupted = true; + } + + /** + * Gets the counter. + * + * @return the counter + */ + public Long getCounter() { + return counter; + } +} diff --git a/core/core-infrastructure/src/test/resources/logback-test.xml b/core/core-infrastructure/src/test/resources/logback-test.xml new file mode 100644 index 000000000..034511335 --- /dev/null +++ b/core/core-infrastructure/src/test/resources/logback-test.xml @@ -0,0 +1,70 @@ + + + + + + Apex + + + + + + + %d %contextName [%t] %level %logger{36} - %msg%n + + + + + + + + + + + + + + + + + ${LOG_DIR}/apex.log + + %d %-5relative [procId=${processId}] [%thread] %-5level + %logger{26} - %msg %n %ex{full} + + + + + ${LOG_DIR}/apex_ctxt.log + + %d %-5relative [procId=${processId}] [%thread] %-5level + %logger{26} - %msg %n %ex{full} + + + + + + + + + + + -- cgit 1.2.3-korg